Completed
Pull Request — master (#149)
by Denis
02:36
created

opcua.common.SocketWrapper.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
cc 1
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6
7 1
try:
8
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
9 1
    import asyncio
10
except ImportError:
11
    import trollius as asyncio
12
13
14 1
from opcua.common.uaerrors import UaError
15
16
17 1
class ServiceError(UaError):
18 1
    def __init__(self, code):
19 1
        super(ServiceError, self).__init__('UA Service Error')
20 1
        self.code = code
21
22
23 1
class NotEnoughData(UaError):
24 1
    pass
25
26
27 1
class SocketClosedException(UaError):
28 1
    pass
29
30
31 1
class Buffer(object):
32
33
    """
34
    alternative to io.BytesIO making debug easier
35
    and added a few conveniance methods
36
    """
37
38 1
    def __init__(self, data, start_pos=0, size=-1):
39
        # self.logger = logging.getLogger(__name__)
40 1
        self._data = data
41 1
        self._cur_pos = start_pos
42 1
        if size == -1:
43 1
            size = len(data) - start_pos
44 1
        self._size = size
45
46 1
    def __str__(self):
47
        return "Buffer(size:{}, data:{})".format(
48
            self._size,
49
            self._data[self._cur_pos:self._cur_pos + self._size])
50 1
    __repr__ = __str__
51
52 1
    def __len__(self):
53 1
        return self._size
54
55 1
    def read(self, size):
56
        """
57
        read and pop number of bytes for buffer
58
        """
59 1
        if size > self._size:
60
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
61
        # self.logger.debug("Request for %s bytes, from %s", size, self)
62 1
        self._size -= size
63 1
        pos = self._cur_pos
64 1
        self._cur_pos += size
65 1
        data = self._data[pos:self._cur_pos]
66
        # self.logger.debug("Returning: %s ", data)
67 1
        return data
68
69 1
    def copy(self, size=-1):
70
        """
71
        return a shadow copy, optionnaly only copy 'size' bytes
72
        """
73 1
        if size == -1 or size > self._size:
74 1
            size = self._size
75 1
        return Buffer(self._data, self._cur_pos, size)
76
77 1
    def skip(self, size):
78
        """
79
        skip size bytes in buffer
80
        """
81 1
        if size > self._size:
82
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
83 1
        self._size -= size
84 1
        self._cur_pos += size
85
86
87 1
class SocketWrapper(object):
88
    """
89
    wrapper to make it possible to have same api for 
90
    normal sockets, socket from asyncio, StringIO, etc....
91
    """
92 1
    def __init__(self, sock):
93 1
        self.socket = sock
94
95 1
    def read(self, size):
96
        """
97
        Receive up to size bytes from socket
98
        """
99 1
        data = b''
100 1
        while size > 0:
101 1
            try:
102 1
                chunk = self.socket.recv(size)
103
            except (OSError, ConnectionResetError):
104
                raise SocketClosedException("Server socket has closed")
105 1
            if not chunk:
106 1
                raise SocketClosedException("Server socket has closed")
107 1
            data += chunk
108 1
            size -= len(chunk)
109 1
        return data
110
111 1
    def write(self, data):
112 1
        self.socket.sendall(data)
113
114
115 1
def create_nonce(size=32):
116 1
    return os.urandom(size)
117
118
119 1
class ThreadLoop(threading.Thread):
120
    """
121
    run an asyncio loop in a thread
122
    """
123
124 1
    def __init__(self):
125 1
        threading.Thread.__init__(self)
126 1
        self.logger = logging.getLogger(__name__)
127 1
        self.loop = None
128 1
        self._cond = threading.Condition()
129
130 1
    def start(self):
131 1
        with self._cond:
132 1
            threading.Thread.start(self)
133 1
            self._cond.wait()
134
135 1
    def run(self):
136 1
        self.logger.debug("Starting subscription thread")
137 1
        self.loop = asyncio.new_event_loop()
138 1
        asyncio.set_event_loop(self.loop)
139 1
        with self._cond:
140 1
            self._cond.notify_all()
141 1
        self.loop.run_forever()
142 1
        self.logger.debug("subscription thread ended")
143
144 1
    def create_server(self, proto, hostname, port):
145 1
        return self.loop.create_server(proto, hostname, port)
146
147 1
    def stop(self):
148
        """
149
        stop subscription loop, thus the subscription thread
150
        """
151 1
        self.loop.call_soon_threadsafe(self.loop.stop)
152
153 1
    def call_soon(self, callback):
154 1
        self.loop.call_soon_threadsafe(callback)
155
156 1
    def call_later(self, delay, callback):
157
        """
158
        threadsafe call_later from asyncio
159
        """
160 1
        p = functools.partial(self.loop.call_later, delay, callback)
161 1
        self.loop.call_soon_threadsafe(p)
162
163 1
    def _create_task(self, future, coro, cb=None):
164
        #task = self.loop.create_task(coro)
165 1
        task = asyncio.async(coro, loop=self.loop) 
166 1
        if cb:
167 1
            task.add_done_callback(cb)
168 1
        future.set_result(task)
169
170 1
    def create_task(self, coro, cb=None):
171
        """
172
        threadsafe create_task from asyncio
173
        """
174 1
        future = Future()
175 1
        p = functools.partial(self._create_task, future, coro, cb)
176 1
        self.loop.call_soon_threadsafe(p)
177 1
        return future.result()
178
179 1
    def run_coro_and_wait(self, coro):
180 1
        cond = threading.Condition()
181 1
        def cb(_):
182 1
            with cond:
183 1
                cond.notify_all()
184 1
        with cond:
185 1
            task = self.create_task(coro, cb)
186 1
            cond.wait()
187 1
        return task.result()
188
189 1
    def _run_until_complete(self, future, coro):
190
        task = self.loop.run_until_complete(coro)
191
        future.set_result(task)
192
193 1
    def run_until_complete(self, coro):
194
        """
195
        threadsafe run_until_completed from asyncio
196
        """
197
        future = Future()
198
        p = functools.partial(self._run_until_complete, future, coro)
199
        self.loop.call_soon_threadsafe(p)
200
        return future.result()
201
202
203
204