Completed
Pull Request — master (#170)
by Denis
03:35
created

SocketWrapper   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 26
Duplicated Lines 0 %

Test Coverage

Coverage 88.24%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 0
loc 26
ccs 15
cts 17
cp 0.8824
rs 10
c 2
b 0
f 0
wmc 6

3 Methods

Rating   Name   Duplication   Size   Complexity  
A write() 0 2 1
A __init__() 0 2 1
A read() 0 15 4
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
from socket import error as SocketError
7
8 1
try:
9
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
10 1
    import asyncio
11
except ImportError:
12
    import trollius as asyncio
13
14
15 1
from opcua.common.uaerrors import UaError
16
17
18 1
class ServiceError(UaError):
19 1
    def __init__(self, code):
20 1
        super(ServiceError, self).__init__('UA Service Error')
21 1
        self.code = code
22
23
24 1
class NotEnoughData(UaError):
25 1
    pass
26
27
28 1
class SocketClosedException(UaError):
29 1
    pass
30
31
32 1
class Buffer(object):
33
34
    """
35
    alternative to io.BytesIO making debug easier
36
    and added a few conveniance methods
37
    """
38
39 1
    def __init__(self, data, start_pos=0, size=-1):
40
        # self.logger = logging.getLogger(__name__)
41 1
        self._data = data
42 1
        self._cur_pos = start_pos
43 1
        if size == -1:
44 1
            size = len(data) - start_pos
45 1
        self._size = size
46
47 1
    def __str__(self):
48
        return "Buffer(size:{}, data:{})".format(
49
            self._size,
50
            self._data[self._cur_pos:self._cur_pos + self._size])
51 1
    __repr__ = __str__
52
53 1
    def __len__(self):
54 1
        return self._size
55
56 1
    def read(self, size):
57
        """
58
        read and pop number of bytes for buffer
59
        """
60 1
        if size > self._size:
61
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
62
        # self.logger.debug("Request for %s bytes, from %s", size, self)
63 1
        self._size -= size
64 1
        pos = self._cur_pos
65 1
        self._cur_pos += size
66 1
        data = self._data[pos:self._cur_pos]
67
        # self.logger.debug("Returning: %s ", data)
68 1
        return data
69
70 1
    def copy(self, size=-1):
71
        """
72
        return a shadow copy, optionnaly only copy 'size' bytes
73
        """
74 1
        if size == -1 or size > self._size:
75 1
            size = self._size
76 1
        return Buffer(self._data, self._cur_pos, size)
77
78 1
    def skip(self, size):
79
        """
80
        skip size bytes in buffer
81
        """
82 1
        if size > self._size:
83
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
84 1
        self._size -= size
85 1
        self._cur_pos += size
86
87
88 1
class SocketWrapper(object):
89
    """
90
    wrapper to make it possible to have same api for 
91
    normal sockets, socket from asyncio, StringIO, etc....
92
    """
93 1
    def __init__(self, sock):
94 1
        self.socket = sock
95
96 1
    def read(self, size):
97
        """
98
        Receive up to size bytes from socket
99
        """
100 1
        data = b''
101 1
        while size > 0:
102 1
            try:
103 1
                chunk = self.socket.recv(size)
104
            except (OSError, SocketError) as ex:
105
                raise SocketClosedException("Server socket has closed", ex)
106 1
            if not chunk:
107 1
                raise SocketClosedException("Server socket has closed")
108 1
            data += chunk
109 1
            size -= len(chunk)
110 1
        return data
111
112 1
    def write(self, data):
113 1
        self.socket.sendall(data)
114
115
116 1
def create_nonce(size=32):
117 1
    return os.urandom(size)
118
119
120 1
class ThreadLoop(threading.Thread):
121
    """
122
    run an asyncio loop in a thread
123
    """
124
125 1
    def __init__(self):
126 1
        threading.Thread.__init__(self)
127 1
        self.logger = logging.getLogger(__name__)
128 1
        self.loop = None
129 1
        self._cond = threading.Condition()
130
131 1
    def start(self):
132 1
        with self._cond:
133 1
            threading.Thread.start(self)
134 1
            self._cond.wait()
135
136 1
    def run(self):
137 1
        self.logger.debug("Starting subscription thread")
138 1
        self.loop = asyncio.new_event_loop()
139 1
        asyncio.set_event_loop(self.loop)
140 1
        with self._cond:
141 1
            self._cond.notify_all()
142 1
        self.loop.run_forever()
143 1
        self.logger.debug("subscription thread ended")
144
145 1
    def create_server(self, proto, hostname, port):
146 1
        return self.loop.create_server(proto, hostname, port)
147
148 1
    def stop(self):
149
        """
150
        stop subscription loop, thus the subscription thread
151
        """
152 1
        self.loop.call_soon_threadsafe(self.loop.stop)
153
154 1
    def call_soon(self, callback):
155 1
        self.loop.call_soon_threadsafe(callback)
156
157 1
    def call_later(self, delay, callback):
158
        """
159
        threadsafe call_later from asyncio
160
        """
161 1
        p = functools.partial(self.loop.call_later, delay, callback)
162 1
        self.loop.call_soon_threadsafe(p)
163
164 1
    def _create_task(self, future, coro, cb=None):
165
        #task = self.loop.create_task(coro)
166 1
        task = asyncio.async(coro, loop=self.loop) 
167 1
        if cb:
168 1
            task.add_done_callback(cb)
169 1
        future.set_result(task)
170
171 1
    def create_task(self, coro, cb=None):
172
        """
173
        threadsafe create_task from asyncio
174
        """
175 1
        future = Future()
176 1
        p = functools.partial(self._create_task, future, coro, cb)
177 1
        self.loop.call_soon_threadsafe(p)
178 1
        return future.result()
179
180 1
    def run_coro_and_wait(self, coro):
181 1
        cond = threading.Condition()
182 1
        def cb(_):
183 1
            with cond:
184 1
                cond.notify_all()
185 1
        with cond:
186 1
            task = self.create_task(coro, cb)
187 1
            cond.wait()
188 1
        return task.result()
189
190 1
    def _run_until_complete(self, future, coro):
191
        task = self.loop.run_until_complete(coro)
192
        future.set_result(task)
193
194 1
    def run_until_complete(self, coro):
195
        """
196
        threadsafe run_until_completed from asyncio
197
        """
198
        future = Future()
199
        p = functools.partial(self._run_until_complete, future, coro)
200
        self.loop.call_soon_threadsafe(p)
201
        return future.result()
202
203
204
205