Test Failed
Pull Request — master (#51)
by Olivier
04:49
created

asyncua.sync.ThreadLoop.__enter__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoop(Thread):
18
    def __init__(self):
19
        Thread.__init__(self)
20
        self.loop = None
21
        self._cond = Condition()
22
23
    def start(self):
24
        Thread.start(self)
25
        with self._cond:
26
            self._cond.wait()
27
28
    def run(self):
29
        self.loop = asyncio.new_event_loop()
30
        with self._cond:
31
            self._cond.notify_all()
32
        self.loop.run_forever()
33
34
    def stop(self):
35
        self.loop.call_soon_threadsafe(self.loop.stop)
36
37
    def post(self, coro):
38
        print("POST", coro)
39
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
40
        return futur.result()
41
42
    def __enter__(self):
43
        self.start()
44
        return self
45
46
    def __exit__(self, exc_t, exc_v, trace):
47
        self.stop()
48
        self.join()
49
50
51
def syncmethod(func):
52
    def wrapper(self, *args, **kwargs):
53
        args = list(args)  #FIXME: might be very inefficient...
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
54
        for idx, arg in enumerate(args):
55
            if isinstance(arg, Node):
56
                args[idx] = arg.aio_obj
57
        for k, v in kwargs.items():
58
            if isinstance(v, Node):
59
                kwargs[k] = v.aio_obj
60
        aio_func = getattr(self.aio_obj, func.__name__)
61
        print("POST", func, aio_func, args, kwargs)
62
        result = self.tloop.post(aio_func(*args, **kwargs))
63
        print("RES", result)
64
        if isinstance(result, node.Node):
65
            return Node(self.tloop, result)
66
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
67
            return [Node(self.tloop, i) for i in result]
68
        if isinstance(result, server.event_generator.EventGenerator):
69
            return EventGenerator(result)
70
        if isinstance(result, subscription.Subscription):
71
            return Subscription(self.tloop, result)
72
        return result
73
74
    return wrapper
75
76
77
class _SubHandler:
78
    def __init__(self, tloop, sync_handler):
79
        self.tloop = tloop
80
        self.sync_handler = sync_handler
81
82
    def datachange_notification(self, node, val, data):
83
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
84
85
    def event_notification(self, event):
86
        self.sync_handler.event_notification(event)
87
88
89
class Client:
90
    def __init__(self, url: str, timeout: int = 4, tloop=None):
91
        self.tloop = tloop
92
        self.close_tloop = False
93
        if not self.tloop:
94
            self.tloop = ThreadLoop()
95
            self.tloop.start()
96
            self.close_tloop = True
97
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
98
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
99
100
    def __str__(self):
101
        return "Sync" + self.aio_obj.__str__()
102
    __repr__ = __str__
103
104
    @syncmethod
105
    def connect(self):
106
        pass
107
108
    def disconnect(self):
109
        self.tloop.post(self.aio_obj.disconnect())
110
        if self.close_tloop:
111
            self.tloop.stop()
112
113
    @syncmethod
114
    def load_type_definitions(self, nodes=None):
115
        pass
116
117
    @syncmethod
118
    def load_enums(self):
119
        pass
120
121
    def create_subscription(self, period, handler):
122
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
123
        aio_sub = self.tloop.post(coro)
124
        return Subscription(self.tloop, aio_sub)
125
126
    @syncmethod
127
    def get_namespace_index(self, url):
128
        pass
129
130
    def get_node(self, nodeid):
131
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
132
133
    def __enter__(self):
134
        self.connect()
135
        return self
136
137
    def __exit__(self, exc_type, exc_value, traceback):
138
        self.disconnect()
139
140
141
class Shortcuts:
142
    def __init__(self, tloop, aio_server):
143
        self.tloop = tloop
144
        self.aio_obj = shortcuts.Shortcuts(aio_server)
145
        for k, v in self.aio_obj.__dict__.items():
146
            setattr(self, k, Node(self.tloop, v))
147
148
149
class Server:
150
    def __init__(self, shelf_file=None, tloop=None):
151
        self.tloop = tloop
152
        self.close_tloop = False
153
        if not self.tloop:
154
            self.tloop = ThreadLoop()
155
            self.tloop.start()
156
            self.close_tloop = True
157
        self.aio_obj = server.Server(loop=self.tloop.loop)
158
        self.tloop.post(self.aio_obj.init(shelf_file))
159
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
160
161
    def __str__(self):
162
        return "Sync" + self.aio_obj.__str__()
163
    __repr__ = __str__
164
165
    def __enter__(self):
166
        self.start()
167
        return self
168
169
    def __exit__(self, exc_type, exc_value, traceback):
170
        self.stop()
171
172
    def set_endpoint(self, url):
173
        return self.aio_obj.set_endpoint(url)
174
175
    def set_server_name(self, name):
176
        return self.aio_obj.set_server_name(name)
177
178
    def set_security_policy(self, security_policy):
179
        return self.aio_obj.set_security_policy(security_policy)
180
181
    def disable_clock(self, boolean):
182
        return self.aio_obj.disable_clock(boolean)
183
184
    @syncmethod
185
    def register_namespace(self, url):
186
        return self.aio_obj.register_namespace(url)
187
188
    @syncmethod
189
    def start(self):
190
        pass
191
192
    def stop(self):
193
        self.tloop.post(self.aio_obj.stop())
194
        if self.close_tloop:
195
            self.tloop.stop()
196
197
    def link_method(self, node, callback):
198
        return self.aio_obj.link_method(node, callback)
199
200
    @syncmethod
201
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
202
        pass
203
204
    def get_node(self, nodeid):
205
        return Node(self.tloop, server.Server.get_node(self, nodeid))
206
207
    @syncmethod
208
    def import_xml(self, path=None, xmlstring=None):
209
        pass
210
211
    @syncmethod
212
    def get_namespace_index(self, url):
213
        pass
214
215
    @syncmethod
216
    def load_enums(self):
217
        pass
218
219
    @syncmethod
220
    def load_type_definitions(self):
221
        pass
222
223
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
224
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
225
226
227
class EventGenerator:
228
    def __init__(self, aio_evgen):
229
        self.aio_obj = aio_evgen
230
231
    @property
232
    def event(self):
233
        return self.aio_obj.event
234
235
    def trigger(self, time=None, message=None):
236
        return self.aio_obj.trigger(time, message)
237
238
239
class Node:
240
    def __init__(self, tloop, aio_node):
241
        self.aio_obj = aio_node
242
        self.tloop = tloop
243
244
    def __hash__(self):
245
        return self.aio_obj.__hash__()
246
247
    def __str__(self):
248
        return "Sync" + self.aio_obj.__str__()
249
    __repr__ = __str__
250
251
    @property
252
    def nodeid(self):
253
        return self.aio_obj.nodeid
254
255
    @syncmethod
256
    def get_browse_name(self):
257
        pass
258
259
    @syncmethod
260
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
261
        pass
262
263
    @syncmethod
264
    def get_children_descriptions(self,
265
                                  refs=ua.ObjectIds.HierarchicalReferences,
266
                                  nodeclassmask=ua.NodeClass.Unspecified,
267
                                  includesubtypes=True):
268
        pass
269
270
    @syncmethod
271
    def get_child(self, path):
272
        pass
273
274
    @syncmethod
275
    def set_modelling_rule(self, mandatory: bool):
276
        pass
277
278
    @syncmethod
279
    def add_variable(self, ns, name, val):
280
        pass
281
282
    @syncmethod
283
    def add_property(self, ns, name, val):
284
        pass
285
286
    @syncmethod
287
    def add_object(self, ns, name):
288
        pass
289
290
    @syncmethod
291
    def add_object_type(self, ns, name):
292
        pass
293
294
    @syncmethod
295
    def add_folder(self, ns, name):
296
        pass
297
298
    @syncmethod
299
    def add_method(self, *args):
300
        pass
301
302
    @syncmethod
303
    def set_writable(self, writable=True):
304
        pass
305
306
    @syncmethod
307
    def set_value(self, val):
308
        pass
309
310
    @syncmethod
311
    def get_value(self, val):
312
        pass
313
314
    @syncmethod
315
    def call_method(self, methodid, *args):
316
        pass
317
318
    def __eq__(self, other):
319
        return self.aio_obj == other.aio_obj
320
321
322
class Subscription:
323
    def __init__(self, tloop, sub):
324
        self.tloop = tloop
325
        self.aio_obj = sub
326
327
    @syncmethod
328
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
329
        pass
330
331
    @syncmethod
332
    def subscribe_events(self,
333
                         sourcenode=ua.ObjectIds.Server,
334
                         evtypes=ua.ObjectIds.BaseEventType,
335
                         evfilter=None,
336
                         queuesize=0):
337
        pass
338
339
    @syncmethod
340
    def unsubscribe(self, handle):
341
        pass
342
343
    @syncmethod
344
    async def create_monitored_items(self, monitored_items):
345
        pass
346
347
    @syncmethod
348
    def delete(self):
349
        pass
350