Completed
Push — master ( 158c0f...3f750a )
by Olivier
04:09
created

SocketWrapper.read()   B

Complexity

Conditions 5

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 5

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 17
ccs 13
cts 13
cp 1
crap 5
rs 8.5454
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
from socket import error as SocketError
7
from socket import timeout as SocketTimeout
8 1
9
try:
10 1
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
11
    import asyncio
12
except ImportError:
13
    import trollius as asyncio
14
15 1
16
from opcua.common.uaerrors import UaError
17
18 1
19 1
class ServiceError(UaError):
20 1
    def __init__(self, code):
21 1
        super(ServiceError, self).__init__('UA Service Error')
22
        self.code = code
23
24 1
25 1
class NotEnoughData(UaError):
26
    pass
27
28 1
29 1
class SocketClosedException(UaError):
30
    pass
31
32 1
33
class SocketTimeoutException(UaError):
34
    pass
35
36
37
class Buffer(object):
38
39 1
    """
40
    alternative to io.BytesIO making debug easier
41 1
    and added a few conveniance methods
42 1
    """
43 1
44 1
    def __init__(self, data, start_pos=0, size=-1):
45 1
        # self.logger = logging.getLogger(__name__)
46
        self._data = data
47 1
        self._cur_pos = start_pos
48
        if size == -1:
49
            size = len(data) - start_pos
50
        self._size = size
51 1
52
    def __str__(self):
53 1
        return "Buffer(size:{}, data:{})".format(
54 1
            self._size,
55
            self._data[self._cur_pos:self._cur_pos + self._size])
56 1
    __repr__ = __str__
57
58
    def __len__(self):
59
        return self._size
60 1
61
    def read(self, size):
62
        """
63 1
        read and pop number of bytes for buffer
64 1
        """
65 1
        if size > self._size:
66 1
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
67
        # self.logger.debug("Request for %s bytes, from %s", size, self)
68 1
        self._size -= size
69
        pos = self._cur_pos
70 1
        self._cur_pos += size
71
        data = self._data[pos:self._cur_pos]
72
        # self.logger.debug("Returning: %s ", data)
73
        return data
74 1
75 1
    def copy(self, size=-1):
76 1
        """
77
        return a shadow copy, optionnaly only copy 'size' bytes
78 1
        """
79
        if size == -1 or size > self._size:
80
            size = self._size
81
        return Buffer(self._data, self._cur_pos, size)
82 1
83
    def skip(self, size):
84 1
        """
85 1
        skip size bytes in buffer
86
        """
87
        if size > self._size:
88 1
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
89
        self._size -= size
90
        self._cur_pos += size
91
92
93 1
class SocketWrapper(object):
94 1
    """
95
    wrapper to make it possible to have same api for
96 1
    normal sockets, socket from asyncio, StringIO, etc....
97
    """
98
99
    def __init__(self, sock):
100 1
        self.socket = sock
101 1
102 1
    def read(self, size):
103 1
        """
104 1
        Receive up to size bytes from socket
105 1
        """
106 1
        data = b''
107 1
        while size > 0:
108 1
            try:
109 1
                chunk = self.socket.recv(size)
110 1
            except SocketTimeout as ex:
111
                raise SocketTimeoutException("Server socket has timed out")
112 1
            except (OSError, SocketError) as ex:
113 1
                raise SocketClosedException("Server socket has closed", ex)
114
            if not chunk:
115
                raise SocketClosedException("Server socket has closed")
116 1
            data += chunk
117 1
            size -= len(chunk)
118
        return data
119
120 1
    def write(self, data):
121
        self.socket.sendall(data)
122
123
124
def create_nonce(size=32):
125 1
    return os.urandom(size)
126 1
127 1
128 1
class ThreadLoop(threading.Thread):
129 1
    """
130
    run an asyncio loop in a thread
131 1
    """
132 1
133 1
    def __init__(self):
134 1
        threading.Thread.__init__(self)
135
        self.logger = logging.getLogger(__name__)
136 1
        self.loop = None
137 1
        self._cond = threading.Condition()
138 1
139 1
    def start(self):
140 1
        with self._cond:
141 1
            threading.Thread.start(self)
142 1
            self._cond.wait()
143 1
144
    def run(self):
145 1
        self.logger.debug("Starting subscription thread")
146 1
        self.loop = asyncio.new_event_loop()
147
        asyncio.set_event_loop(self.loop)
148 1
        with self._cond:
149
            self._cond.notify_all()
150
        self.loop.run_forever()
151
        self.logger.debug("subscription thread ended")
152 1
153
    def create_server(self, proto, hostname, port):
154 1
        return self.loop.create_server(proto, hostname, port)
155 1
156
    def stop(self):
157 1
        """
158
        stop subscription loop, thus the subscription thread
159
        """
160
        self.loop.call_soon_threadsafe(self.loop.stop)
161 1
162 1
    def call_soon(self, callback):
163
        self.loop.call_soon_threadsafe(callback)
164 1
165
    def call_later(self, delay, callback):
166 1
        """
167 1
        threadsafe call_later from asyncio
168 1
        """
169 1
        p = functools.partial(self.loop.call_later, delay, callback)
170
        self.loop.call_soon_threadsafe(p)
171 1
172
    def _create_task(self, future, coro, cb=None):
173
        #task = self.loop.create_task(coro)
174
        task = asyncio.async(coro, loop=self.loop) 
175 1
        if cb:
176 1
            task.add_done_callback(cb)
177 1
        future.set_result(task)
178 1
179
    def create_task(self, coro, cb=None):
180 1
        """
181 1
        threadsafe create_task from asyncio
182 1
        """
183 1
        future = Future()
184 1
        p = functools.partial(self._create_task, future, coro, cb)
185 1
        self.loop.call_soon_threadsafe(p)
186 1
        return future.result()
187 1
188 1
    def run_coro_and_wait(self, coro):
189
        cond = threading.Condition()
190 1
        def cb(_):
191
            with cond:
192
                cond.notify_all()
193
        with cond:
194 1
            task = self.create_task(coro, cb)
195
            cond.wait()
196
        return task.result()
197
198
    def _run_until_complete(self, future, coro):
199
        task = self.loop.run_until_complete(coro)
200
        future.set_result(task)
201
202
    def run_until_complete(self, coro):
203
        """
204
        threadsafe run_until_completed from asyncio
205
        """
206
        future = Future()
207
        p = functools.partial(self._run_until_complete, future, coro)
208
        self.loop.call_soon_threadsafe(p)
209
        return future.result()
210
211
212
213