Completed
Pull Request — master (#51)
by Olivier
05:38
created

asyncua.sync.Node.add_object()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoop(Thread):
18
    def __init__(self):
19
        Thread.__init__(self)
20
        self.loop = None
21
        self._cond = Condition()
22
23
    def start(self):
24
        with self._cond:
25
            Thread.start(self)
26
            self._cond.wait()
27
28
    def run(self):
29
        self.loop = asyncio.new_event_loop()
30
        with self._cond:
31
            self._cond.notify_all()
32
        self.loop.run_forever()
33
34
    def stop(self):
35
        self.loop.call_soon_threadsafe(self.loop.stop)
36
37
    def post(self, coro):
38
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
39
        return futur.result()
40
41
    def __enter__(self):
42
        self.start()
43
        global _tloop
44
        _tloop = self
45
        return self
46
47
    def __exit__(self, exc_t, exc_v, trace):
48
        self.stop()
49
        self.join()
50
51
#@ipcmethod
52
53
_ref_count = 0
54
_tloop = None
55
56
57
def start_thread_loop():
58
    global _tloop
59
    _tloop = ThreadLoop()
60
    _tloop.start()
61
    return _tloop
62
63
64
def stop_thread_loop():
65
    global _tloop
66
    _tloop.stop()
67
    _tloop.join()
68
69
70
def get_thread_loop():
71
    global _tloop
72
    if _tloop is None:
73
        start_thread_loop()
74
    global _ref_count
75
    _ref_count += 1
76
    return _tloop
77
78
79
def release_thread_loop():
80
    global _tloop
81
    if _tloop is None:
82
        return
83
    global _ref_count
84
    if _ref_count == 0:
85
        _ref_count -= 1
86
    stop_thread_loop()
87
88
89
def syncmethod(func):
90
    def wrapper(self, *args, **kwargs):
91
        args = list(args)  #FIXME: might be very inefficient...
92
        for idx, arg in enumerate(args):
93
            if isinstance(arg, Node):
94
                args[idx] = arg.aio_obj
95
        for k, v in kwargs.items():
96
            if isinstance(v, Node):
97
                kwargs[k] = v.aio_obj
98
        aio_func = getattr(self.aio_obj, func.__name__)
99
        global _tloop
100
        result = _tloop.post(aio_func(*args, **kwargs))
101
        if isinstance(result, node.Node):
102
            return Node(result)
103
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
104
            return [Node(i) for i in result]
105
        if isinstance(result, server.event_generator.EventGenerator):
106
            return EventGenerator(result)
107
        if isinstance(result, subscription.Subscription):
108
            return Subscription(result)
109
        return result
110
111
    return wrapper
112
113
114
class _SubHandler:
115
    def __init__(self, sync_handler):
116
        self.sync_handler = sync_handler
117
118
    def datachange_notification(self, node, val, data):
119
        self.sync_handler.datachange_notification(Node(node), val, data)
120
121
    def event_notification(self, event):
122
        self.sync_handler.event_notification(event)
123
124
125
class Client:
126
    def __init__(self, url: str, timeout: int = 4):
127
        global _tloop
128
        self.aio_obj = client.Client(url, timeout, loop=_tloop.loop)
129
        self.nodes = Shortcuts(self.aio_obj.uaclient)
130
131
    def __str__(self):
132
        return "Sync" + self.aio_obj.__str__()
133
    __repr__ = __str__
134
135
    @syncmethod
136
    def connect(self):
137
        pass
138
139
    @syncmethod
140
    def disconnect(self):
141
        pass
142
143
    @syncmethod
144
    def load_type_definitions(self, nodes=None):
145
        pass
146
147
    def create_subscription(self, period, handler):
148
        coro = self.aio_obj.create_subscription(period, _SubHandler(handler))
149
        aio_sub = _tloop.post(coro)
150
        return Subscription(aio_sub)
151
152
    @syncmethod
153
    def get_namespace_index(self, url):
154
        pass
155
156
    def get_node(self, nodeid):
157
        return Node(self.aio_obj.get_node(nodeid))
158
159
    def __enter__(self):
160
        self.connect()
161
        return self
162
163
    def __exit__(self, exc_type, exc_value, traceback):
164
        self.disconnect()
165
166
167
class Shortcuts:
168
    def __init__(self, aio_server):
169
        self.aio_obj = shortcuts.Shortcuts(aio_server)
170
        for k, v in self.aio_obj.__dict__.items():
171
            setattr(self, k, Node(v))
172
173
174
class Server:
175
    def __init__(self, shelf_file=None):
176
        global _tloop
177
        self.aio_obj = server.Server(loop=_tloop.loop)
178
        _tloop.post(self.aio_obj.init(shelf_file))
179
        self.nodes = Shortcuts(self.aio_obj.iserver.isession)
180
181
    def __str__(self):
182
        return "Sync" + self.aio_obj.__str__()
183
    __repr__ = __str__
184
185
    def __enter__(self):
186
        self.start()
187
        return self
188
189
    def __exit__(self, exc_type, exc_value, traceback):
190
        self.stop()
191
192
    def set_endpoint(self, url):
193
        return self.aio_obj.set_endpoint(url)
194
195
    def set_server_name(self, name):
196
        return self.aio_obj.set_server_name(name)
197
198
    def set_security_policy(self, security_policy):
199
        return self.aio_obj.set_security_policy(security_policy)
200
201
    def disable_clock(self, boolean):
202
        return self.aio_obj.disable_clock(boolean)
203
204
    @syncmethod
205
    def register_namespace(self, url):
206
        return self.aio_obj.register_namespace(url)
207
208
    @syncmethod
209
    def start(self):
210
        pass
211
212
    @syncmethod
213
    def stop(self):
214
        pass
215
216
    def link_method(self, node, callback):
217
        return self.aio_obj.link_method(node, callback)
218
219
    @syncmethod
220
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
221
        pass
222
223
    def get_node(self, nodeid):
224
        return Node(server.Server.get_node(self, nodeid))
225
226
    @syncmethod
227
    def import_xml(self, path=None, xmlstring=None):
228
        pass
229
230
    @syncmethod
231
    def get_namespace_index(self, url):
232
        pass
233
234
    @syncmethod
235
    def load_enums(self):
236
        pass
237
238
    @syncmethod
239
    def load_type_definitions(self):
240
        pass
241
242
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
243
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
244
245
246
class EventGenerator:
247
    def __init__(self, aio_evgen):
248
        self.aio_obj = aio_evgen
249
250
    @property
251
    def event(self):
252
        return self.aio_obj.event
253
254
    def trigger(self, time=None, message=None):
255
        return self.aio_obj.trigger(time, message)
256
257
258
class Node:
259
    def __init__(self, aio_node):
260
        self.aio_obj = aio_node
261
        global _tloop
262
263
    def __hash__(self):
264
        return self.aio_obj.__hash__()
265
266
    def __str__(self):
267
        return "Sync" + self.aio_obj.__str__()
268
    __repr__ = __str__
269
270
    @property
271
    def nodeid(self):
272
        return self.aio_obj.nodeid
273
274
    @syncmethod
275
    def get_browse_name(self):
276
        pass
277
278
    @syncmethod
279
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
280
        pass
281
282
    @syncmethod
283
    def get_children_descriptions(self,
284
                                  refs=ua.ObjectIds.HierarchicalReferences,
285
                                  nodeclassmask=ua.NodeClass.Unspecified,
286
                                  includesubtypes=True):
287
        pass
288
289
    @syncmethod
290
    def get_child(self, path):
291
        pass
292
293
    @syncmethod
294
    def set_modelling_rule(self, mandatory: bool):
295
        pass
296
297
    @syncmethod
298
    def add_variable(self, ns, name, val):
299
        pass
300
301
    @syncmethod
302
    def add_property(self, ns, name, val):
303
        pass
304
305
    @syncmethod
306
    def add_object(self, ns, name):
307
        pass
308
309
    @syncmethod
310
    def add_object_type(self, ns, name):
311
        pass
312
313
    @syncmethod
314
    def add_folder(self, ns, name):
315
        pass
316
317
    @syncmethod
318
    def add_method(self, *args):
319
        pass
320
321
    @syncmethod
322
    def set_writable(self, writable=True):
323
        pass
324
325
    @syncmethod
326
    def set_value(self, val):
327
        pass
328
329
    @syncmethod
330
    def get_value(self, val):
331
        pass
332
333
    @syncmethod
334
    def call_method(self, methodid, *args):
335
        pass
336
337
    def __eq__(self, other):
338
        return self.aio_obj == other.aio_obj
339
340
341
class Subscription:
342
    def __init__(self, sub):
343
        self.aio_obj = sub
344
345
    @syncmethod
346
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
347
        pass
348
349
    @syncmethod
350
    def subscribe_events(self,
351
                         sourcenode=ua.ObjectIds.Server,
352
                         evtypes=ua.ObjectIds.BaseEventType,
353
                         evfilter=None,
354
                         queuesize=0):
355
        pass
356
357
    @syncmethod
358
    def unsubscribe(self, handle):
359
        pass
360
361
    @syncmethod
362
    async def create_monitored_items(self, monitored_items):
363
        pass
364
365
    @syncmethod
366
    def delete(self):
367
        pass
368