Passed
Pull Request — master (#120)
by Olivier
02:31
created

asyncua.sync.Node.read_value()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoopNotRunning(Exception):
18
    pass
19
20
21
class ThreadLoop(Thread):
22
    def __init__(self):
23
        Thread.__init__(self)
24
        self.loop = None
25
        self._cond = Condition()
26
27
    def start(self):
28
        with self._cond:
29
            Thread.start(self)
30
            self._cond.wait()
31
32
    def run(self):
33
        self.loop = asyncio.new_event_loop()
34
        logger.debug("Threadloop: %s", self.loop)
35
        self.loop.call_soon_threadsafe(self._notify_start)
36
        self.loop.run_forever()
37
38
    def _notify_start(self):
39
        with self._cond:
40
            self._cond.notify_all()
41
42
    def stop(self):
43
        self.loop.call_soon_threadsafe(self.loop.stop)
44
45
    def post(self, coro):
46
        if not self.loop or not self.loop.is_running():
47
            raise ThreadLoopNotRunning(f"could not post {coro}")
48
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
49
        return futur.result()
50
51
    def __enter__(self):
52
        self.start()
53
        return self
54
55
    def __exit__(self, exc_t, exc_v, trace):
56
        self.stop()
57
        self.join()
58
59
60
def syncmethod(func):
61
    def wrapper(self, *args, **kwargs):
62
        args = list(args)  #FIXME: might be very inefficient...
63
        for idx, arg in enumerate(args):
64
            if isinstance(arg, Node):
65
                args[idx] = arg.aio_obj
66
        for k, v in kwargs.items():
67
            if isinstance(v, Node):
68
                kwargs[k] = v.aio_obj
69
        aio_func = getattr(self.aio_obj, func.__name__)
70
        result = self.tloop.post(aio_func(*args, **kwargs))
71
        if isinstance(result, node.Node):
72
            return Node(self.tloop, result)
73
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
74
            return [Node(self.tloop, i) for i in result]
75
        if isinstance(result, server.event_generator.EventGenerator):
76
            return EventGenerator(result)
77
        if isinstance(result, subscription.Subscription):
78
            return Subscription(self.tloop, result)
79
        return result
80
81
    return wrapper
82
83
84
class _SubHandler:
85
    def __init__(self, tloop, sync_handler):
86
        self.tloop = tloop
87
        self.sync_handler = sync_handler
88
89
    def datachange_notification(self, node, val, data):
90
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
91
92
    def event_notification(self, event):
93
        self.sync_handler.event_notification(event)
94
95
96
class Client:
97
    def __init__(self, url: str, timeout: int = 4, tloop=None):
98
        self.tloop = tloop
99
        self.close_tloop = False
100
        if not self.tloop:
101
            self.tloop = ThreadLoop()
102
            self.tloop.start()
103
            self.close_tloop = True
104
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
105
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
106
107
    def __str__(self):
108
        return "Sync" + self.aio_obj.__str__()
109
    __repr__ = __str__
110
111
    @syncmethod
112
    def connect(self):
113
        pass
114
115
    def disconnect(self):
116
        self.tloop.post(self.aio_obj.disconnect())
117
        if self.close_tloop:
118
            self.tloop.stop()
119
120
    @syncmethod
121
    def load_type_definitions(self, nodes=None):
122
        pass
123
124
    @syncmethod
125
    def load_enums(self):
126
        pass
127
128
    def create_subscription(self, period, handler):
129
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
130
        aio_sub = self.tloop.post(coro)
131
        return Subscription(self.tloop, aio_sub)
132
133
    @syncmethod
134
    def get_namespace_index(self, url):
135
        pass
136
137
    def get_node(self, nodeid):
138
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
139
140
    def __enter__(self):
141
        self.connect()
142
        return self
143
144
    def __exit__(self, exc_type, exc_value, traceback):
145
        self.disconnect()
146
147
148
class Shortcuts:
149
    def __init__(self, tloop, aio_server):
150
        self.tloop = tloop
151
        self.aio_obj = shortcuts.Shortcuts(aio_server)
152
        for k, v in self.aio_obj.__dict__.items():
153
            setattr(self, k, Node(self.tloop, v))
154
155
156
class Server:
157
    def __init__(self, shelf_file=None, tloop=None):
158
        self.tloop = tloop
159
        self.close_tloop = False
160
        if not self.tloop:
161
            self.tloop = ThreadLoop()
162
            self.tloop.start()
163
            self.close_tloop = True
164
        self.aio_obj = server.Server(loop=self.tloop.loop)
165
        self.tloop.post(self.aio_obj.init(shelf_file))
166
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
167
168
    def __str__(self):
169
        return "Sync" + self.aio_obj.__str__()
170
    __repr__ = __str__
171
172
    def __enter__(self):
173
        self.start()
174
        return self
175
176
    def __exit__(self, exc_type, exc_value, traceback):
177
        self.stop()
178
179
    def set_endpoint(self, url):
180
        return self.aio_obj.set_endpoint(url)
181
182
    def set_server_name(self, name):
183
        return self.aio_obj.set_server_name(name)
184
185
    def set_security_policy(self, security_policy):
186
        return self.aio_obj.set_security_policy(security_policy)
187
188
    def disable_clock(self, boolean):
189
        return self.aio_obj.disable_clock(boolean)
190
191
    @syncmethod
192
    def register_namespace(self, url):
193
        return self.aio_obj.register_namespace(url)
194
195
    @syncmethod
196
    def start(self):
197
        pass
198
199
    def stop(self):
200
        self.tloop.post(self.aio_obj.stop())
201
        if self.close_tloop:
202
            self.tloop.stop()
203
204
    def link_method(self, node, callback):
205
        return self.aio_obj.link_method(node, callback)
206
207
    @syncmethod
208
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
209
        pass
210
211
    def get_node(self, nodeid):
212
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
213
214
    @syncmethod
215
    def import_xml(self, path=None, xmlstring=None):
216
        pass
217
218
    @syncmethod
219
    def get_namespace_index(self, url):
220
        pass
221
222
    @syncmethod
223
    def load_enums(self):
224
        pass
225
226
    @syncmethod
227
    def load_type_definitions(self):
228
        pass
229
230
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
231
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
232
233
234
class EventGenerator:
235
    def __init__(self, aio_evgen):
236
        self.aio_obj = aio_evgen
237
238
    @property
239
    def event(self):
240
        return self.aio_obj.event
241
242
    def trigger(self, time=None, message=None):
243
        return self.aio_obj.trigger(time, message)
244
245
246
class Node:
247
    def __init__(self, tloop, aio_node):
248
        self.aio_obj = aio_node
249
        self.tloop = tloop
250
251
    def __hash__(self):
252
        return self.aio_obj.__hash__()
253
254
    def __str__(self):
255
        return "Sync" + self.aio_obj.__str__()
256
    __repr__ = __str__
257
258
    @property
259
    def nodeid(self):
260
        return self.aio_obj.nodeid
261
262
    @syncmethod
263
    def get_browse_name(self):
264
        pass
265
266
    @syncmethod
267
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
268
        pass
269
270
    @syncmethod
271
    def get_children_descriptions(self,
272
                                  refs=ua.ObjectIds.HierarchicalReferences,
273
                                  nodeclassmask=ua.NodeClass.Unspecified,
274
                                  includesubtypes=True):
275
        pass
276
277
    @syncmethod
278
    def get_child(self, path):
279
        pass
280
281
    @syncmethod
282
    def set_modelling_rule(self, mandatory: bool):
283
        pass
284
285
    @syncmethod
286
    def add_variable(self, ns, name, val):
287
        pass
288
289
    @syncmethod
290
    def add_property(self, ns, name, val):
291
        pass
292
293
    @syncmethod
294
    def add_object(self, ns, name):
295
        pass
296
297
    @syncmethod
298
    def add_object_type(self, ns, name):
299
        pass
300
301
    @syncmethod
302
    def add_folder(self, ns, name):
303
        pass
304
305
    @syncmethod
306
    def add_method(self, *args):
307
        pass
308
309
    @syncmethod
310
    def set_writable(self, writable=True):
311
        pass
312
313
    @syncmethod
314
    def write_value(self, val):
315
        pass
316
317
    @syncmethod
318
    def read_value(self, val):
319
        pass
320
321
    @syncmethod
322
    def call_method(self, methodid, *args):
323
        pass
324
325
    def __eq__(self, other):
326
        return self.aio_obj == other.aio_obj
327
328
329
class Subscription:
330
    def __init__(self, tloop, sub):
331
        self.tloop = tloop
332
        self.aio_obj = sub
333
334
    @syncmethod
335
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
336
        pass
337
338
    @syncmethod
339
    def subscribe_events(self,
340
                         sourcenode=ua.ObjectIds.Server,
341
                         evtypes=ua.ObjectIds.BaseEventType,
342
                         evfilter=None,
343
                         queuesize=0):
344
        pass
345
346
    @syncmethod
347
    def unsubscribe(self, handle):
348
        pass
349
350
    @syncmethod
351
    async def create_monitored_items(self, monitored_items):
352
        pass
353
354
    @syncmethod
355
    def delete(self):
356
        pass
357