Completed
Pull Request — master (#184)
by Olivier
03:02
created

asyncua.sync.Node.read_display_name()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoopNotRunning(Exception):
18
    pass
19
20
21
class ThreadLoop(Thread):
22
    def __init__(self):
23
        Thread.__init__(self)
24
        self.loop = None
25
        self._cond = Condition()
26
27
    def start(self):
28
        with self._cond:
29
            Thread.start(self)
30
            self._cond.wait()
31
32
    def run(self):
33
        self.loop = asyncio.new_event_loop()
34
        logger.debug("Threadloop: %s", self.loop)
35
        self.loop.call_soon_threadsafe(self._notify_start)
36
        self.loop.run_forever()
37
38
    def _notify_start(self):
39
        with self._cond:
40
            self._cond.notify_all()
41
42
    def stop(self):
43
        self.loop.call_soon_threadsafe(self.loop.stop)
44
        self.join()
45
        self.loop.close()
46
47
    def post(self, coro):
48
        if not self.loop or not self.loop.is_running():
49
            raise ThreadLoopNotRunning(f"could not post {coro}")
50
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
51
        return futur.result()
52
53
    def __enter__(self):
54
        self.start()
55
        return self
56
57
    def __exit__(self, exc_t, exc_v, trace):
58
        self.stop()
59
60
61
def syncmethod(func):
62
    def wrapper(self, *args, **kwargs):
63
        args = list(args)  #FIXME: might be very inefficient...
64
        for idx, arg in enumerate(args):
65
            if isinstance(arg, Node):
66
                args[idx] = arg.aio_obj
67
        for k, v in kwargs.items():
68
            if isinstance(v, Node):
69
                kwargs[k] = v.aio_obj
70
        aio_func = getattr(self.aio_obj, func.__name__)
71
        result = self.tloop.post(aio_func(*args, **kwargs))
72
        if isinstance(result, node.Node):
73
            return Node(self.tloop, result)
74
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
75
            return [Node(self.tloop, i) for i in result]
76
        if isinstance(result, server.event_generator.EventGenerator):
77
            return EventGenerator(result)
78
        if isinstance(result, subscription.Subscription):
79
            return Subscription(self.tloop, result)
80
        return result
81
82
    return wrapper
83
84
85
class _SubHandler:
86
    def __init__(self, tloop, sync_handler):
87
        self.tloop = tloop
88
        self.sync_handler = sync_handler
89
90
    def datachange_notification(self, node, val, data):
91
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
92
93
    def event_notification(self, event):
94
        self.sync_handler.event_notification(event)
95
96
97
class Client:
98
    def __init__(self, url: str, timeout: int = 4, tloop=None):
99
        self.tloop = tloop
100
        self.close_tloop = False
101
        if not self.tloop:
102
            self.tloop = ThreadLoop()
103
            self.tloop.start()
104
            self.close_tloop = True
105
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
106
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
107
108
    def __str__(self):
109
        return "Sync" + self.aio_obj.__str__()
110
    __repr__ = __str__
111
112
    @syncmethod
113
    def connect(self):
114
        pass
115
116
    def disconnect(self):
117
        self.tloop.post(self.aio_obj.disconnect())
118
        if self.close_tloop:
119
            self.tloop.stop()
120
121
    @syncmethod
122
    def load_type_definitions(self, nodes=None):
123
        pass
124
125
    @syncmethod
126
    def set_security(self):
127
        pass
128
129
    @syncmethod
130
    def load_enums(self):
131
        pass
132
133
    def create_subscription(self, period, handler):
134
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
135
        aio_sub = self.tloop.post(coro)
136
        return Subscription(self.tloop, aio_sub)
137
138
    @syncmethod
139
    def get_namespace_index(self, url):
140
        pass
141
142
    def get_node(self, nodeid):
143
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
144
145
    @syncmethod
146
    def connect_and_get_server_endpoints(self):
147
        pass
148
149
    def __enter__(self):
150
        self.connect()
151
        return self
152
153
    def __exit__(self, exc_type, exc_value, traceback):
154
        self.disconnect()
155
156
157
class Shortcuts:
158
    def __init__(self, tloop, aio_server):
159
        self.tloop = tloop
160
        self.aio_obj = shortcuts.Shortcuts(aio_server)
161
        for k, v in self.aio_obj.__dict__.items():
162
            setattr(self, k, Node(self.tloop, v))
163
164
165
class Server:
166
    def __init__(self, shelf_file=None, tloop=None):
167
        self.tloop = tloop
168
        self.close_tloop = False
169
        if not self.tloop:
170
            self.tloop = ThreadLoop()
171
            self.tloop.start()
172
            self.close_tloop = True
173
        self.aio_obj = server.Server(loop=self.tloop.loop)
174
        self.tloop.post(self.aio_obj.init(shelf_file))
175
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
176
177
    def __str__(self):
178
        return "Sync" + self.aio_obj.__str__()
179
    __repr__ = __str__
180
181
    def __enter__(self):
182
        self.start()
183
        return self
184
185
    def __exit__(self, exc_type, exc_value, traceback):
186
        self.stop()
187
188
    def set_endpoint(self, url):
189
        return self.aio_obj.set_endpoint(url)
190
191
    def set_server_name(self, name):
192
        return self.aio_obj.set_server_name(name)
193
194
    def set_security_policy(self, security_policy):
195
        return self.aio_obj.set_security_policy(security_policy)
196
197
    def disable_clock(self, boolean):
198
        return self.aio_obj.disable_clock(boolean)
199
200
    @syncmethod
201
    def register_namespace(self, url):
202
        return self.aio_obj.register_namespace(url)
203
204
    @syncmethod
205
    def start(self):
206
        pass
207
208
    def stop(self):
209
        self.tloop.post(self.aio_obj.stop())
210
        if self.close_tloop:
211
            self.tloop.stop()
212
213
    def link_method(self, node, callback):
214
        return self.aio_obj.link_method(node, callback)
215
216
    @syncmethod
217
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
218
        pass
219
220
    def get_node(self, nodeid):
221
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
222
223
    @syncmethod
224
    def import_xml(self, path=None, xmlstring=None):
225
        pass
226
227
    @syncmethod
228
    def get_namespace_index(self, url):
229
        pass
230
231
    @syncmethod
232
    def load_enums(self):
233
        pass
234
235
    @syncmethod
236
    def load_type_definitions(self):
237
        pass
238
239
    def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
240
        return self.aio_obj.write_attribute_value(nodeid, datavalue, attr)
241
242
243
class EventGenerator:
244
    def __init__(self, aio_evgen):
245
        self.aio_obj = aio_evgen
246
247
    @property
248
    def event(self):
249
        return self.aio_obj.event
250
251
    def trigger(self, time=None, message=None):
252
        return self.aio_obj.trigger(time, message)
253
254
255
class Node:
256
    def __init__(self, tloop, aio_node):
257
        self.aio_obj = aio_node
258
        self.tloop = tloop
259
260
    def __eq__(self, other):
261
        return self.aio_obj == other.aio_obj
262
263
    def __ne__(self, other):
264
        return not self.__eq__(other)
265
266
    def __str__(self):
267
        return "Sync" + self.aio_obj.__str__()
268
269
    __repr__ = __str__
270
271
    def __hash__(self):
272
        return self.aio_obj.__hash__()
273
274
    @property
275
    def nodeid(self):
276
        return self.aio_obj.nodeid
277
278
    @syncmethod
279
    def read_browse_name(self):
280
        pass
281
282
    @syncmethod
283
    def read_display_name(self):
284
        pass
285
286
    @syncmethod
287
    def get_children(
288
        self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified
289
    ):
290
        pass
291
292
    @syncmethod
293
    def get_children_descriptions(
294
        self,
295
        refs=ua.ObjectIds.HierarchicalReferences,
296
        nodeclassmask=ua.NodeClass.Unspecified,
297
        includesubtypes=True,
298
    ):
299
        pass
300
301
    @syncmethod
302
    def get_child(self, path):
303
        pass
304
305
    @syncmethod
306
    def set_modelling_rule(self, mandatory: bool):
307
        pass
308
309
    @syncmethod
310
    def add_variable(self, ns, name, val):
311
        pass
312
313
    @syncmethod
314
    def add_property(self, ns, name, val):
315
        pass
316
317
    @syncmethod
318
    def add_object(self, ns, name):
319
        pass
320
321
    @syncmethod
322
    def add_object_type(self, ns, name):
323
        pass
324
325
    @syncmethod
326
    def add_folder(self, ns, name):
327
        pass
328
329
    @syncmethod
330
    def add_method(self, *args):
331
        pass
332
333
    @syncmethod
334
    def set_writable(self, writable=True):
335
        pass
336
337
    @syncmethod
338
    def write_value(self, val):
339
        pass
340
341
    set_value = write_value  # legacy
342
343
    @syncmethod
344
    def read_value(self):
345
        pass
346
347
    get_value = read_value  # legacy
348
349
    @syncmethod
350
    def call_method(self, methodid, *args):
351
        pass
352
353
    @syncmethod
354
    def get_references(
355
        self,
356
        refs=ua.ObjectIds.References,
357
        direction=ua.BrowseDirection.Both,
358
        nodeclassmask=ua.NodeClass.Unspecified,
359
        includesubtypes=True,
360
    ):
361
        pass
362
363
    @syncmethod
364
    def get_description(self):
365
        pass
366
367
    @syncmethod
368
    def get_variables(self):
369
        pass
370
371
    @syncmethod
372
    def get_path(self):
373
        pass
374
375
    @syncmethod
376
    def read_node_class(self):
377
        pass
378
379
    @syncmethod
380
    def read_attributes(self):
381
        pass
382
383
class Subscription:
384
    def __init__(self, tloop, sub):
385
        self.tloop = tloop
386
        self.aio_obj = sub
387
388
    @syncmethod
389
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
390
        pass
391
392
    @syncmethod
393
    def subscribe_events(
394
        self,
395
        sourcenode=ua.ObjectIds.Server,
396
        evtypes=ua.ObjectIds.BaseEventType,
397
        evfilter=None,
398
        queuesize=0,
399
    ):
400
        pass
401
402
    @syncmethod
403
    def unsubscribe(self, handle):
404
        pass
405
406
    @syncmethod
407
    async def create_monitored_items(self, monitored_items):
408
        pass
409
410
    @syncmethod
411
    def delete(self):
412
        pass
413