Test Failed
Pull Request — master (#51)
by Olivier
05:01
created

asyncua.sync.Server.__init__()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 3
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoop(Thread):
18
    def __init__(self):
19
        Thread.__init__(self)
20
        self.loop = None
21
        self._cond = Condition()
22
23
    def start(self):
24
        Thread.start(self)
25
        with self._cond:
26
            self._cond.wait()
27
28
    def run(self):
29
        self.loop = asyncio.new_event_loop()
30
        with self._cond:
31
            self._cond.notify_all()
32
        self.loop.run_forever()
33
34
    def stop(self):
35
        self.loop.call_soon_threadsafe(self.loop.stop)
36
37
    def post(self, coro):
38
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
39
        return futur.result()
40
41
    def __enter__(self):
42
        self.start()
43
        return self
44
45
    def __exit__(self, exc_t, exc_v, trace):
46
        self.stop()
47
        self.join()
48
49
50
def syncmethod(func):
51
    def wrapper(self, *args, **kwargs):
52
        args = list(args)  #FIXME: might be very inefficient...
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
53
        for idx, arg in enumerate(args):
54
            if isinstance(arg, Node):
55
                args[idx] = arg.aio_obj
56
        for k, v in kwargs.items():
57
            if isinstance(v, Node):
58
                kwargs[k] = v.aio_obj
59
        aio_func = getattr(self.aio_obj, func.__name__)
60
        result = self.tloop.post(aio_func(*args, **kwargs))
61
        if isinstance(result, node.Node):
62
            return Node(self.tloop, result)
63
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
64
            return [Node(self.tloop, i) for i in result]
65
        if isinstance(result, server.event_generator.EventGenerator):
66
            return EventGenerator(result)
67
        if isinstance(result, subscription.Subscription):
68
            return Subscription(self.tloop, result)
69
        return result
70
71
    return wrapper
72
73
74
class _SubHandler:
75
    def __init__(self, tloop, sync_handler):
76
        self.tloop = tloop
77
        self.sync_handler = sync_handler
78
79
    def datachange_notification(self, node, val, data):
80
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
81
82
    def event_notification(self, event):
83
        self.sync_handler.event_notification(event)
84
85
86
class Client:
87
    def __init__(self, url: str, timeout: int = 4, tloop=None):
88
        self.tloop = tloop
89
        self.close_tloop = False
90
        if not self.tloop:
91
            self.tloop = ThreadLoop()
92
            self.tloop.start()
93
            self.close_tloop = True
94
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
95
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
96
97
    def __str__(self):
98
        return "Sync" + self.aio_obj.__str__()
99
    __repr__ = __str__
100
101
    @syncmethod
102
    def connect(self):
103
        pass
104
105
    def disconnect(self):
106
        self.tloop.post(self.aio_obj.disconnect())
107
        if self.close_tloop:
108
            self.tloop.stop()
109
110
    @syncmethod
111
    def load_type_definitions(self, nodes=None):
112
        pass
113
114
    @syncmethod
115
    def load_enums(self):
116
        pass
117
118
    def create_subscription(self, period, handler):
119
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
120
        aio_sub = self.tloop.post(coro)
121
        return Subscription(self.tloop, aio_sub)
122
123
    @syncmethod
124
    def get_namespace_index(self, url):
125
        pass
126
127
    def get_node(self, nodeid):
128
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
129
130
    def __enter__(self):
131
        self.connect()
132
        return self
133
134
    def __exit__(self, exc_type, exc_value, traceback):
135
        self.disconnect()
136
137
138
class Shortcuts:
139
    def __init__(self, tloop, aio_server):
140
        self.tloop = tloop
141
        self.aio_obj = shortcuts.Shortcuts(aio_server)
142
        for k, v in self.aio_obj.__dict__.items():
143
            setattr(self, k, Node(self.tloop, v))
144
145
146
class Server:
147
    def __init__(self, shelf_file=None, tloop=None):
148
        self.tloop = tloop
149
        self.close_tloop = False
150
        if not self.tloop:
151
            self.tloop = ThreadLoop()
152
            self.tloop.start()
153
            self.close_tloop = True
154
        self.aio_obj = server.Server(loop=self.tloop.loop)
155
        self.tloop.post(self.aio_obj.init(shelf_file))
156
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
157
158
    def __str__(self):
159
        return "Sync" + self.aio_obj.__str__()
160
    __repr__ = __str__
161
162
    def __enter__(self):
163
        self.start()
164
        return self
165
166
    def __exit__(self, exc_type, exc_value, traceback):
167
        self.stop()
168
169
    def set_endpoint(self, url):
170
        return self.aio_obj.set_endpoint(url)
171
172
    def set_server_name(self, name):
173
        return self.aio_obj.set_server_name(name)
174
175
    def set_security_policy(self, security_policy):
176
        return self.aio_obj.set_security_policy(security_policy)
177
178
    def disable_clock(self, boolean):
179
        return self.aio_obj.disable_clock(boolean)
180
181
    @syncmethod
182
    def register_namespace(self, url):
183
        return self.aio_obj.register_namespace(url)
184
185
    @syncmethod
186
    def start(self):
187
        pass
188
189
    def stop(self):
190
        self.tloop.post(self.aio_obj.stop())
191
        if self.close_tloop:
192
            self.tloop.stop()
193
194
    def link_method(self, node, callback):
195
        return self.aio_obj.link_method(node, callback)
196
197
    @syncmethod
198
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
199
        pass
200
201
    def get_node(self, nodeid):
202
        return Node(self.tloop, server.Server.get_node(self, nodeid))
203
204
    @syncmethod
205
    def import_xml(self, path=None, xmlstring=None):
206
        pass
207
208
    @syncmethod
209
    def get_namespace_index(self, url):
210
        pass
211
212
    @syncmethod
213
    def load_enums(self):
214
        pass
215
216
    @syncmethod
217
    def load_type_definitions(self):
218
        pass
219
220
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
221
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
222
223
224
class EventGenerator:
225
    def __init__(self, aio_evgen):
226
        self.aio_obj = aio_evgen
227
228
    @property
229
    def event(self):
230
        return self.aio_obj.event
231
232
    def trigger(self, time=None, message=None):
233
        return self.aio_obj.trigger(time, message)
234
235
236
class Node:
237
    def __init__(self, tloop, aio_node):
238
        self.aio_obj = aio_node
239
        self.tloop = tloop
240
241
    def __hash__(self):
242
        return self.aio_obj.__hash__()
243
244
    def __str__(self):
245
        return "Sync" + self.aio_obj.__str__()
246
    __repr__ = __str__
247
248
    @property
249
    def nodeid(self):
250
        return self.aio_obj.nodeid
251
252
    @syncmethod
253
    def get_browse_name(self):
254
        pass
255
256
    @syncmethod
257
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
258
        pass
259
260
    @syncmethod
261
    def get_children_descriptions(self,
262
                                  refs=ua.ObjectIds.HierarchicalReferences,
263
                                  nodeclassmask=ua.NodeClass.Unspecified,
264
                                  includesubtypes=True):
265
        pass
266
267
    @syncmethod
268
    def get_child(self, path):
269
        pass
270
271
    @syncmethod
272
    def set_modelling_rule(self, mandatory: bool):
273
        pass
274
275
    @syncmethod
276
    def add_variable(self, ns, name, val):
277
        pass
278
279
    @syncmethod
280
    def add_property(self, ns, name, val):
281
        pass
282
283
    @syncmethod
284
    def add_object(self, ns, name):
285
        pass
286
287
    @syncmethod
288
    def add_object_type(self, ns, name):
289
        pass
290
291
    @syncmethod
292
    def add_folder(self, ns, name):
293
        pass
294
295
    @syncmethod
296
    def add_method(self, *args):
297
        pass
298
299
    @syncmethod
300
    def set_writable(self, writable=True):
301
        pass
302
303
    @syncmethod
304
    def set_value(self, val):
305
        pass
306
307
    @syncmethod
308
    def get_value(self, val):
309
        pass
310
311
    @syncmethod
312
    def call_method(self, methodid, *args):
313
        pass
314
315
    def __eq__(self, other):
316
        return self.aio_obj == other.aio_obj
317
318
319
class Subscription:
320
    def __init__(self, tloop, sub):
321
        self.tloop = tloop
322
        self.aio_obj = sub
323
324
    @syncmethod
325
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
326
        pass
327
328
    @syncmethod
329
    def subscribe_events(self,
330
                         sourcenode=ua.ObjectIds.Server,
331
                         evtypes=ua.ObjectIds.BaseEventType,
332
                         evfilter=None,
333
                         queuesize=0):
334
        pass
335
336
    @syncmethod
337
    def unsubscribe(self, handle):
338
        pass
339
340
    @syncmethod
341
    async def create_monitored_items(self, monitored_items):
342
        pass
343
344
    @syncmethod
345
    def delete(self):
346
        pass
347