Passed
Push — master ( 2f994c...020f3a )
by Olivier
03:00
created

asyncua.sync.Client.load_data_type_definitions()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoopNotRunning(Exception):
18
    pass
19
20
21
class ThreadLoop(Thread):
22
    def __init__(self):
23
        Thread.__init__(self)
24
        self.loop = None
25
        self._cond = Condition()
26
27
    def start(self):
28
        with self._cond:
29
            Thread.start(self)
30
            self._cond.wait()
31
32
    def run(self):
33
        self.loop = asyncio.new_event_loop()
34
        logger.debug("Threadloop: %s", self.loop)
35
        self.loop.call_soon_threadsafe(self._notify_start)
36
        self.loop.run_forever()
37
38
    def _notify_start(self):
39
        with self._cond:
40
            self._cond.notify_all()
41
42
    def stop(self):
43
        self.loop.call_soon_threadsafe(self.loop.stop)
44
        self.join()
45
        self.loop.close()
46
47
    def post(self, coro):
48
        if not self.loop or not self.loop.is_running():
49
            raise ThreadLoopNotRunning(f"could not post {coro}")
50
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
51
        return futur.result()
52
53
    def __enter__(self):
54
        self.start()
55
        return self
56
57
    def __exit__(self, exc_t, exc_v, trace):
58
        self.stop()
59
60
61
def syncmethod(func):
62
    def wrapper(self, *args, **kwargs):
63
        args = list(args)  # FIXME: might be very inefficient...
64
        for idx, arg in enumerate(args):
65
            if isinstance(arg, Node):
66
                args[idx] = arg.aio_obj
67
        for k, v in kwargs.items():
68
            if isinstance(v, Node):
69
                kwargs[k] = v.aio_obj
70
        aio_func = getattr(self.aio_obj, func.__name__)
71
        result = self.tloop.post(aio_func(*args, **kwargs))
72
        if isinstance(result, node.Node):
73
            return Node(self.tloop, result)
74
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
75
            return [Node(self.tloop, i) for i in result]
76
        if isinstance(result, server.event_generator.EventGenerator):
77
            return EventGenerator(self.tloop, result)
78
        if isinstance(result, subscription.Subscription):
79
            return Subscription(self.tloop, result)
80
        return result
81
82
    return wrapper
83
84
85
class _SubHandler:
86
    def __init__(self, tloop, sync_handler):
87
        self.tloop = tloop
88
        self.sync_handler = sync_handler
89
90
    def datachange_notification(self, node, val, data):
91
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
92
93
    def event_notification(self, event):
94
        self.sync_handler.event_notification(event)
95
96
97
class Client:
98
    def __init__(self, url: str, timeout: int = 4, tloop=None):
99
        self.tloop = tloop
100
        self.close_tloop = False
101
        if not self.tloop:
102
            self.tloop = ThreadLoop()
103
            self.tloop.start()
104
            self.close_tloop = True
105
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
106
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
107
108
    def __str__(self):
109
        return "Sync" + self.aio_obj.__str__()
110
    __repr__ = __str__
111
112
    @syncmethod
113
    def connect(self):
114
        pass
115
116
    def disconnect(self):
117
        self.tloop.post(self.aio_obj.disconnect())
118
        if self.close_tloop:
119
            self.tloop.stop()
120
121
    def set_user(self, username: str):
122
        self.aio_obj.set_user(username)
123
124
    def set_password(self, pwd: str):
125
        self.aio_obj.set_password(pwd)
126
127
    @syncmethod
128
    def load_type_definitions(self, nodes=None):
129
        pass
130
131
    @syncmethod
132
    async def load_data_type_definitions(self, node=None):
133
        pass
134
135
    @syncmethod
136
    def set_security(self):
137
        pass
138
139
    @syncmethod
140
    def load_enums(self):
141
        pass
142
143
    def create_subscription(self, period, handler):
144
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
145
        aio_sub = self.tloop.post(coro)
146
        return Subscription(self.tloop, aio_sub)
147
148
    @syncmethod
149
    def get_namespace_index(self, url):
150
        pass
151
152
    def get_node(self, nodeid):
153
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
154
155
    @syncmethod
156
    def connect_and_get_server_endpoints(self):
157
        pass
158
159
    def __enter__(self):
160
        self.connect()
161
        return self
162
163
    def __exit__(self, exc_type, exc_value, traceback):
164
        self.disconnect()
165
166
167
class Shortcuts:
168
    def __init__(self, tloop, aio_server):
169
        self.tloop = tloop
170
        self.aio_obj = shortcuts.Shortcuts(aio_server)
171
        for k, v in self.aio_obj.__dict__.items():
172
            setattr(self, k, Node(self.tloop, v))
173
174
175
class Server:
176
    def __init__(self, shelf_file=None, tloop=None):
177
        self.tloop = tloop
178
        self.close_tloop = False
179
        if not self.tloop:
180
            self.tloop = ThreadLoop()
181
            self.tloop.start()
182
            self.close_tloop = True
183
        self.aio_obj = server.Server(loop=self.tloop.loop)
184
        self.tloop.post(self.aio_obj.init(shelf_file))
185
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
186
187
    def __str__(self):
188
        return "Sync" + self.aio_obj.__str__()
189
    __repr__ = __str__
190
191
    def __enter__(self):
192
        self.start()
193
        return self
194
195
    def __exit__(self, exc_type, exc_value, traceback):
196
        self.stop()
197
198
    def set_endpoint(self, url):
199
        return self.aio_obj.set_endpoint(url)
200
201
    def set_server_name(self, name):
202
        return self.aio_obj.set_server_name(name)
203
204
    def set_security_policy(self, security_policy):
205
        return self.aio_obj.set_security_policy(security_policy)
206
207
    def disable_clock(self, boolean):
208
        return self.aio_obj.disable_clock(boolean)
209
210
    @syncmethod
211
    def register_namespace(self, url):
212
        pass
213
214
    @syncmethod
215
    def start(self):
216
        pass
217
218
    def stop(self):
219
        self.tloop.post(self.aio_obj.stop())
220
        if self.close_tloop:
221
            self.tloop.stop()
222
223
    def link_method(self, node, callback):
224
        return self.aio_obj.link_method(node, callback)
225
226
    @syncmethod
227
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
228
        pass
229
230
    def get_node(self, nodeid):
231
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
232
233
    @syncmethod
234
    def import_xml(self, path=None, xmlstring=None):
235
        pass
236
237
    @syncmethod
238
    def get_namespace_index(self, url):
239
        pass
240
241
    @syncmethod
242
    def load_enums(self):
243
        pass
244
245
    @syncmethod
246
    def load_type_definitions(self):
247
        pass
248
249
    @syncmethod
250
    def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
251
        pass
252
253
254
class EventGenerator:
255
    def __init__(self, tloop, aio_evgen):
256
        self.aio_obj = aio_evgen
257
        self.tloop = tloop
258
259
    @property
260
    def event(self):
261
        return self.aio_obj.event
262
263
    def trigger(self, time=None, message=None):
264
        return self.tloop.post(self.aio_obj.trigger(time, message))
265
266
267
class Node:
268
    def __init__(self, tloop, aio_node):
269
        self.aio_obj = aio_node
270
        self.tloop = tloop
271
272
    def __eq__(self, other):
273
        return other != None and self.aio_obj == other.aio_obj
274
275
    def __ne__(self, other):
276
        return not self.__eq__(other)
277
278
    def __str__(self):
279
        return self.aio_obj.__str__()
280
281
    def __repr__(self):
282
        return "Sync" + self.aio_obj.__repr__()
283
284
    def __hash__(self):
285
        return self.aio_obj.__hash__()
286
287
    def __get_nodeid(self):
288
        return self.aio_obj.nodeid
289
290
    def __set_nodeid(self, value):
291
        self.aio_obj.nodeid = value
292
293
    nodeid = property(__get_nodeid, __set_nodeid)
294
295
    @syncmethod
296
    def read_browse_name(self):
297
        pass
298
299
    @syncmethod
300
    def read_display_name(self):
301
        pass
302
303
    get_display_name = read_display_name  # legacy
304
305
    @syncmethod
306
    def get_children(
307
        self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified
308
    ):
309
        pass
310
311
    @syncmethod
312
    def get_properties(self):
313
        pass
314
315
    @syncmethod
316
    def get_children_descriptions(
317
        self,
318
        refs=ua.ObjectIds.HierarchicalReferences,
319
        nodeclassmask=ua.NodeClass.Unspecified,
320
        includesubtypes=True,
321
    ):
322
        pass
323
324
    @syncmethod
325
    def get_user_access_level(self):
326
        pass
327
328
    @syncmethod
329
    def get_child(self, path):
330
        pass
331
332
    @syncmethod
333
    def set_modelling_rule(self, mandatory: bool):
334
        pass
335
336
    @syncmethod
337
    def add_variable(self, ns, name, val):
338
        pass
339
340
    @syncmethod
341
    def add_property(self, ns, name, val):
342
        pass
343
344
    @syncmethod
345
    def add_object(self, ns, name):
346
        pass
347
348
    @syncmethod
349
    def add_object_type(self, ns, name):
350
        pass
351
352
    @syncmethod
353
    def add_folder(self, ns, name):
354
        pass
355
356
    @syncmethod
357
    def add_method(self, *args):
358
        pass
359
360
    @syncmethod
361
    def set_writable(self, writable=True):
362
        pass
363
364
    @syncmethod
365
    def write_value(self, val):
366
        pass
367
368
    set_value = write_value  # legacy
369
370
    @syncmethod
371
    def write_params(self, params):
372
        pass
373
374
    @syncmethod
375
    def read_params(self, params):
376
        pass
377
378
    @syncmethod
379
    def read_value(self):
380
        pass
381
382
    get_value = read_value  # legacy
383
384
    @syncmethod
385
    def read_data_type_as_variant_type(self):
386
        pass
387
388
    get_data_type_as_variant_type = read_data_type_as_variant_type #legacy
389
390
    @syncmethod
391
    def call_method(self, methodid, *args):
392
        pass
393
394
    @syncmethod
395
    def get_references(
396
        self,
397
        refs=ua.ObjectIds.References,
398
        direction=ua.BrowseDirection.Both,
399
        nodeclassmask=ua.NodeClass.Unspecified,
400
        includesubtypes=True,
401
    ):
402
        pass
403
404
    @syncmethod
405
    def read_description(self):
406
        pass
407
408
    @syncmethod
409
    def get_variables(self):
410
        pass
411
412
    @syncmethod
413
    def get_path(self):
414
        pass
415
416
    @syncmethod
417
    def read_node_class(self):
418
        pass
419
420
    @syncmethod
421
    def read_attributes(self):
422
        pass
423
424
425
class Subscription:
426
    def __init__(self, tloop, sub):
427
        self.tloop = tloop
428
        self.aio_obj = sub
429
430
    @syncmethod
431
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
432
        pass
433
434
    @syncmethod
435
    def subscribe_events(
436
        self,
437
        sourcenode=ua.ObjectIds.Server,
438
        evtypes=ua.ObjectIds.BaseEventType,
439
        evfilter=None,
440
        queuesize=0,
441
    ):
442
        pass
443
444
    def _make_monitored_item_request(self, node: Node, attr, mfilter, queuesize) -> ua.MonitoredItemCreateRequest:
445
        return self.aio_obj._make_monitored_item_request(node, attr, mfilter, queuesize)
446
447
    @syncmethod
448
    def unsubscribe(self, handle):
449
        pass
450
451
    @syncmethod
452
    async def create_monitored_items(self, monitored_items):
453
        pass
454
455
    @syncmethod
456
    def delete(self):
457
        pass
458