Completed
Pull Request — master (#91)
by Olivier
41:49 queued 36:14
created

asyncua.sync.Server.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoopNotRunning(Exception):
18
    pass
19
20
21
class ThreadLoop(Thread):
22
    def __init__(self):
23
        Thread.__init__(self)
24
        self.loop = None
25
        self._cond = Condition()
26
27
    def start(self):
28
        Thread.start(self)
29
        with self._cond:
30
            self._cond.wait()
31
32
    def run(self):
33
        self.loop = asyncio.new_event_loop()
34
        self.loop.call_soon_threadsafe(self._notify_start)
35
        self.loop.run_forever()
36
37
    def _notify_start(self):
38
        with self._cond:
39
            self._cond.notify_all()
40
41
    def stop(self):
42
        self.loop.call_soon_threadsafe(self.loop.stop)
43
44
    def post(self, coro):
45
        if not self.loop or not self.loop.is_running():
46
            raise ThreadLoopNotRunning(f"could not post {coro}")
47
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
48
        return futur.result()
49
50
    def __enter__(self):
51
        self.start()
52
        return self
53
54
    def __exit__(self, exc_t, exc_v, trace):
55
        self.stop()
56
        self.join()
57
58
59
def syncmethod(func):
60
    def wrapper(self, *args, **kwargs):
61
        args = list(args)  #FIXME: might be very inefficient...
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
62
        for idx, arg in enumerate(args):
63
            if isinstance(arg, Node):
64
                args[idx] = arg.aio_obj
65
        for k, v in kwargs.items():
66
            if isinstance(v, Node):
67
                kwargs[k] = v.aio_obj
68
        aio_func = getattr(self.aio_obj, func.__name__)
69
        result = self.tloop.post(aio_func(*args, **kwargs))
70
        if isinstance(result, node.Node):
71
            return Node(self.tloop, result)
72
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
73
            return [Node(self.tloop, i) for i in result]
74
        if isinstance(result, server.event_generator.EventGenerator):
75
            return EventGenerator(result)
76
        if isinstance(result, subscription.Subscription):
77
            return Subscription(self.tloop, result)
78
        return result
79
80
    return wrapper
81
82
83
class _SubHandler:
84
    def __init__(self, tloop, sync_handler):
85
        self.tloop = tloop
86
        self.sync_handler = sync_handler
87
88
    def datachange_notification(self, node, val, data):
89
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
90
91
    def event_notification(self, event):
92
        self.sync_handler.event_notification(event)
93
94
95
class Client:
96
    def __init__(self, url: str, timeout: int = 4, tloop=None):
97
        self.tloop = tloop
98
        self.close_tloop = False
99
        if not self.tloop:
100
            self.tloop = ThreadLoop()
101
            self.tloop.start()
102
            self.close_tloop = True
103
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
104
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
105
106
    def __str__(self):
107
        return "Sync" + self.aio_obj.__str__()
108
    __repr__ = __str__
109
110
    @syncmethod
111
    def connect(self):
112
        pass
113
114
    def disconnect(self):
115
        self.tloop.post(self.aio_obj.disconnect())
116
        if self.close_tloop:
117
            self.tloop.stop()
118
119
    @syncmethod
120
    def load_type_definitions(self, nodes=None):
121
        pass
122
123
    @syncmethod
124
    def load_enums(self):
125
        pass
126
127
    def create_subscription(self, period, handler):
128
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
129
        aio_sub = self.tloop.post(coro)
130
        return Subscription(self.tloop, aio_sub)
131
132
    @syncmethod
133
    def get_namespace_index(self, url):
134
        pass
135
136
    def get_node(self, nodeid):
137
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
138
139
    def __enter__(self):
140
        self.connect()
141
        return self
142
143
    def __exit__(self, exc_type, exc_value, traceback):
144
        self.disconnect()
145
146
147
class Shortcuts:
148
    def __init__(self, tloop, aio_server):
149
        self.tloop = tloop
150
        self.aio_obj = shortcuts.Shortcuts(aio_server)
151
        for k, v in self.aio_obj.__dict__.items():
152
            setattr(self, k, Node(self.tloop, v))
153
154
155
class Server:
156
    def __init__(self, shelf_file=None, tloop=None):
157
        self.tloop = tloop
158
        self.close_tloop = False
159
        if not self.tloop:
160
            self.tloop = ThreadLoop()
161
            self.tloop.start()
162
            self.close_tloop = True
163
        self.aio_obj = server.Server(loop=self.tloop.loop)
164
        self.tloop.post(self.aio_obj.init(shelf_file))
165
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
166
167
    def __str__(self):
168
        return "Sync" + self.aio_obj.__str__()
169
    __repr__ = __str__
170
171
    def __enter__(self):
172
        self.start()
173
        return self
174
175
    def __exit__(self, exc_type, exc_value, traceback):
176
        self.stop()
177
178
    def set_endpoint(self, url):
179
        return self.aio_obj.set_endpoint(url)
180
181
    def set_server_name(self, name):
182
        return self.aio_obj.set_server_name(name)
183
184
    def set_security_policy(self, security_policy):
185
        return self.aio_obj.set_security_policy(security_policy)
186
187
    def disable_clock(self, boolean):
188
        return self.aio_obj.disable_clock(boolean)
189
190
    @syncmethod
191
    def register_namespace(self, url):
192
        return self.aio_obj.register_namespace(url)
193
194
    @syncmethod
195
    def start(self):
196
        pass
197
198
    def stop(self):
199
        self.tloop.post(self.aio_obj.stop())
200
        if self.close_tloop:
201
            self.tloop.stop()
202
203
    def link_method(self, node, callback):
204
        return self.aio_obj.link_method(node, callback)
205
206
    @syncmethod
207
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
208
        pass
209
210
    def get_node(self, nodeid):
211
        return Node(self.tloop, server.Server.get_node(self, nodeid))
212
213
    @syncmethod
214
    def import_xml(self, path=None, xmlstring=None):
215
        pass
216
217
    @syncmethod
218
    def get_namespace_index(self, url):
219
        pass
220
221
    @syncmethod
222
    def load_enums(self):
223
        pass
224
225
    @syncmethod
226
    def load_type_definitions(self):
227
        pass
228
229
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
230
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
231
232
233
class EventGenerator:
234
    def __init__(self, aio_evgen):
235
        self.aio_obj = aio_evgen
236
237
    @property
238
    def event(self):
239
        return self.aio_obj.event
240
241
    def trigger(self, time=None, message=None):
242
        return self.aio_obj.trigger(time, message)
243
244
245
class Node:
246
    def __init__(self, tloop, aio_node):
247
        self.aio_obj = aio_node
248
        self.tloop = tloop
249
250
    def __hash__(self):
251
        return self.aio_obj.__hash__()
252
253
    def __str__(self):
254
        return "Sync" + self.aio_obj.__str__()
255
    __repr__ = __str__
256
257
    @property
258
    def nodeid(self):
259
        return self.aio_obj.nodeid
260
261
    @syncmethod
262
    def get_browse_name(self):
263
        pass
264
265
    @syncmethod
266
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
267
        pass
268
269
    @syncmethod
270
    def get_children_descriptions(self,
271
                                  refs=ua.ObjectIds.HierarchicalReferences,
272
                                  nodeclassmask=ua.NodeClass.Unspecified,
273
                                  includesubtypes=True):
274
        pass
275
276
    @syncmethod
277
    def get_child(self, path):
278
        pass
279
280
    @syncmethod
281
    def set_modelling_rule(self, mandatory: bool):
282
        pass
283
284
    @syncmethod
285
    def add_variable(self, ns, name, val):
286
        pass
287
288
    @syncmethod
289
    def add_property(self, ns, name, val):
290
        pass
291
292
    @syncmethod
293
    def add_object(self, ns, name):
294
        pass
295
296
    @syncmethod
297
    def add_object_type(self, ns, name):
298
        pass
299
300
    @syncmethod
301
    def add_folder(self, ns, name):
302
        pass
303
304
    @syncmethod
305
    def add_method(self, *args):
306
        pass
307
308
    @syncmethod
309
    def set_writable(self, writable=True):
310
        pass
311
312
    @syncmethod
313
    def set_value(self, val):
314
        pass
315
316
    @syncmethod
317
    def get_value(self, val):
318
        pass
319
320
    @syncmethod
321
    def call_method(self, methodid, *args):
322
        pass
323
324
    def __eq__(self, other):
325
        return self.aio_obj == other.aio_obj
326
327
328
class Subscription:
329
    def __init__(self, tloop, sub):
330
        self.tloop = tloop
331
        self.aio_obj = sub
332
333
    @syncmethod
334
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
335
        pass
336
337
    @syncmethod
338
    def subscribe_events(self,
339
                         sourcenode=ua.ObjectIds.Server,
340
                         evtypes=ua.ObjectIds.BaseEventType,
341
                         evfilter=None,
342
                         queuesize=0):
343
        pass
344
345
    @syncmethod
346
    def unsubscribe(self, handle):
347
        pass
348
349
    @syncmethod
350
    async def create_monitored_items(self, monitored_items):
351
        pass
352
353
    @syncmethod
354
    def delete(self):
355
        pass
356