Completed
Pull Request — master (#232)
by
unknown
03:55
created

Event.getName()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
ccs 0
cts 0
cp 0
crap 2
rs 10
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
from socket import error as SocketError
7
from collections import OrderedDict
8 1
9
try:
10 1
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
11
    import asyncio
12
except ImportError:
13
    import trollius as asyncio
14
15 1
16
from opcua.common.uaerrors import UaError
17
18 1
19 1
class ServiceError(UaError):
20 1
    def __init__(self, code):
21 1
        super(ServiceError, self).__init__('UA Service Error')
22
        self.code = code
23
24 1
25 1
class NotEnoughData(UaError):
26
    pass
27
28 1
29 1
class SocketClosedException(UaError):
30
    pass
31
32 1
33
class Buffer(object):
34
35
    """
36
    alternative to io.BytesIO making debug easier
37
    and added a few conveniance methods
38
    """
39 1
40
    def __init__(self, data, start_pos=0, size=-1):
41 1
        # self.logger = logging.getLogger(__name__)
42 1
        self._data = data
43 1
        self._cur_pos = start_pos
44 1
        if size == -1:
45 1
            size = len(data) - start_pos
46
        self._size = size
47 1
48
    def __str__(self):
49
        return "Buffer(size:{}, data:{})".format(
50
            self._size,
51 1
            self._data[self._cur_pos:self._cur_pos + self._size])
52
    __repr__ = __str__
53 1
54 1
    def __len__(self):
55
        return self._size
56 1
57
    def read(self, size):
58
        """
59
        read and pop number of bytes for buffer
60 1
        """
61
        if size > self._size:
62
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
63 1
        # self.logger.debug("Request for %s bytes, from %s", size, self)
64 1
        self._size -= size
65 1
        pos = self._cur_pos
66 1
        self._cur_pos += size
67
        data = self._data[pos:self._cur_pos]
68 1
        # self.logger.debug("Returning: %s ", data)
69
        return data
70 1
71
    def copy(self, size=-1):
72
        """
73
        return a shadow copy, optionnaly only copy 'size' bytes
74 1
        """
75 1
        if size == -1 or size > self._size:
76 1
            size = self._size
77
        return Buffer(self._data, self._cur_pos, size)
78 1
79
    def skip(self, size):
80
        """
81
        skip size bytes in buffer
82 1
        """
83
        if size > self._size:
84 1
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
85 1
        self._size -= size
86
        self._cur_pos += size
87
88 1
89
class SocketWrapper(object):
90
    """
91
    wrapper to make it possible to have same api for 
92
    normal sockets, socket from asyncio, StringIO, etc....
93 1
    """
94 1
    def __init__(self, sock):
95
        self.socket = sock
96 1
97
    def read(self, size):
98
        """
99
        Receive up to size bytes from socket
100 1
        """
101 1
        data = b''
102 1
        while size > 0:
103 1
            try:
104 1
                chunk = self.socket.recv(size)
105 1
            except (OSError, SocketError) as ex:
106 1
                raise SocketClosedException("Server socket has closed", ex)
107 1
            if not chunk:
108 1
                raise SocketClosedException("Server socket has closed")
109 1
            data += chunk
110 1
            size -= len(chunk)
111
        return data
112 1
113 1
    def write(self, data):
114
        self.socket.sendall(data)
115
116 1
117 1
def create_nonce(size=32):
118
    return os.urandom(size)
119
120 1
121
class ThreadLoop(threading.Thread):
122
    """
123
    run an asyncio loop in a thread
124
    """
125 1
126 1
    def __init__(self):
127 1
        threading.Thread.__init__(self)
128 1
        self.logger = logging.getLogger(__name__)
129 1
        self.loop = None
130
        self._cond = threading.Condition()
131 1
132 1
    def start(self):
133 1
        with self._cond:
134 1
            threading.Thread.start(self)
135
            self._cond.wait()
136 1
137 1
    def run(self):
138 1
        self.logger.debug("Starting subscription thread")
139 1
        self.loop = asyncio.new_event_loop()
140 1
        asyncio.set_event_loop(self.loop)
141 1
        with self._cond:
142 1
            self._cond.notify_all()
143 1
        self.loop.run_forever()
144
        self.logger.debug("subscription thread ended")
145 1
146 1
    def create_server(self, proto, hostname, port):
147
        return self.loop.create_server(proto, hostname, port)
148 1
149
    def stop(self):
150
        """
151
        stop subscription loop, thus the subscription thread
152 1
        """
153
        self.loop.call_soon_threadsafe(self.loop.stop)
154 1
155 1
    def call_soon(self, callback):
156
        self.loop.call_soon_threadsafe(callback)
157 1
158
    def call_later(self, delay, callback):
159
        """
160
        threadsafe call_later from asyncio
161 1
        """
162 1
        p = functools.partial(self.loop.call_later, delay, callback)
163
        self.loop.call_soon_threadsafe(p)
164 1
165
    def _create_task(self, future, coro, cb=None):
166 1
        #task = self.loop.create_task(coro)
167 1
        task = asyncio.async(coro, loop=self.loop) 
168 1
        if cb:
169 1
            task.add_done_callback(cb)
170
        future.set_result(task)
171 1
172
    def create_task(self, coro, cb=None):
173
        """
174
        threadsafe create_task from asyncio
175 1
        """
176 1
        future = Future()
177 1
        p = functools.partial(self._create_task, future, coro, cb)
178 1
        self.loop.call_soon_threadsafe(p)
179
        return future.result()
180 1
181 1
    def run_coro_and_wait(self, coro):
182 1
        cond = threading.Condition()
183 1
        def cb(_):
184 1
            with cond:
185 1
                cond.notify_all()
186 1
        with cond:
187 1
            task = self.create_task(coro, cb)
188 1
            cond.wait()
189
        return task.result()
190 1
191
    def _run_until_complete(self, future, coro):
192
        task = self.loop.run_until_complete(coro)
193
        future.set_result(task)
194 1
195
    def run_until_complete(self, coro):
196
        """
197
        threadsafe run_until_completed from asyncio
198
        """
199
        future = Future()
200
        p = functools.partial(self._run_until_complete, future, coro)
201
        self.loop.call_soon_threadsafe(p)
202
        return future.result()
203
    
204
205
206
207
class Event(object):
208
    def __init__(self):
209
        self.__name = None
210
211
    def setName(self, name):
212
        self.__name = name
213
214
    def getName(self):
215
        return self.__name
216
217
218
class ServerItemEvent(Event):
219
    def __init__(self, request_params, response_params):
220
        self.request_params = request_params
221
        self.response_params = response_params
222
        
223
224
class EventSubscriberInterface(object):
225
    def getSubscribedEvents(self):
226
        raise NotImplementedError()
227
228
229
class EventDispatcher(object):
230
    def __init__(self):
231
        self._listeners = {}
232
233
    def dispatch(self, eventName, event=None):
234
        if event is None:
235
            event = Event()
236
        elif not isinstance(event, Event):
237
            raise ValueError('Unexpected event type given')
238
        event.setName(eventName)
239
        if eventName not in self._listeners:
240
            return event
241
        for listener in self._listeners[eventName].values():
242
            listener(event, self)
243
        return event
244
245
    def addListener(self, eventName, listener, priority=0):
246
        if eventName not in self._listeners:
247
            self._listeners[eventName] = {}
248
        self._listeners[eventName][priority] = listener
249
        self._listeners[eventName] = OrderedDict(sorted(self._listeners[eventName].items(), key=lambda item: item[0]))
250
251
    def removeListener(self, eventName, listener=None):
252
        if eventName not in self._listeners:
253
            return
254
        if not listener:
255
            del self._listeners[eventName]
256
        else:
257
            for p, l in self._listeners[eventName].items():
258
                if l is listener:
259
                    self._listeners[eventName].pop(p)
260
                    return
261
262
    def addSubscriber(self, subscriber):
263
        if not isinstance(subscriber, EventSubscriberInterface):
264
            raise ValueError('Unexpected subscriber type given')
265
        for eventName, params in subscriber.getSubscribedEvents().items():
266
            if isinstance(params, str):
267
                self.addListener(eventName, getattr(subscriber, params))
268
            elif isinstance(params, list):
269
                if not params:
270
                    raise ValueError('Invalid params "%r" for event "%s"' % (params, eventName))
271
                if len(params) <= 2 and isinstance(params[0], str):
272
                    priority = params[1] if len(params) > 1 else 0
273
                    self.addListener(eventName, getattr(subscriber, params[0]), priority)
274
                else:
275
                    for listener in params:
276
                        priority = listener[1] if len(listener) > 1 else 0
277
                        self.addListener(eventName, getattr(subscriber, listener[0]), priority)
278
            else:
279
                raise ValueError('Invalid params for event "%s"' % eventName)
280
281
282
283
284
285
286