Completed
Branch master (1a9d89)
by Olivier
04:58
created

SocketWrapper.write()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""
2
Helper function and classes that do not rely on opcua library.
3
Helper function and classes depending on ua object are in ua_utils.py
4
"""
5
6 1
import logging
7 1
import os
8 1
from concurrent.futures import Future
9 1
import functools
10 1
import threading
11 1
from socket import error as SocketError
12
13 1
try:
14 1
    import asyncio
15
except ImportError:
16
    import trollius as asyncio
17
18
19 1
from opcua.ua.uaerrors import UaError
20
21
22 1
class ServiceError(UaError):
23 1
    def __init__(self, code):
24 1
        super(ServiceError, self).__init__('UA Service Error')
25 1
        self.code = code
26
27
28 1
class NotEnoughData(UaError):
29 1
    pass
30
31
32 1
class SocketClosedException(UaError):
33 1
    pass
34
35
36 1
class Buffer(object):
37
38
    """
39
    alternative to io.BytesIO making debug easier
40
    and added a few conveniance methods
41
    """
42
43 1
    def __init__(self, data, start_pos=0, size=-1):
44
        # self.logger = logging.getLogger(__name__)
45 1
        self._data = data
46 1
        self._cur_pos = start_pos
47 1
        if size == -1:
48 1
            size = len(data) - start_pos
49 1
        self._size = size
50
51 1
    def __str__(self):
52
        return "Buffer(size:{0}, data:{1})".format(
53
            self._size,
54
            self._data[self._cur_pos:self._cur_pos + self._size])
55 1
    __repr__ = __str__
56
57 1
    def __len__(self):
58 1
        return self._size
59
60 1
    def read(self, size):
61
        """
62
        read and pop number of bytes for buffer
63
        """
64 1
        if size > self._size:
65
            raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
66
        # self.logger.debug("Request for %s bytes, from %s", size, self)
67 1
        self._size -= size
68 1
        pos = self._cur_pos
69 1
        self._cur_pos += size
70 1
        data = self._data[pos:self._cur_pos]
71
        # self.logger.debug("Returning: %s ", data)
72 1
        return data
73
74 1
    def copy(self, size=-1):
75
        """
76
        return a shadow copy, optionnaly only copy 'size' bytes
77
        """
78 1
        if size == -1 or size > self._size:
79 1
            size = self._size
80 1
        return Buffer(self._data, self._cur_pos, size)
81
82 1
    def skip(self, size):
83
        """
84
        skip size bytes in buffer
85
        """
86 1
        if size > self._size:
87
            raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
88 1
        self._size -= size
89 1
        self._cur_pos += size
90
91
92 1
class SocketWrapper(object):
93
    """
94
    wrapper to make it possible to have same api for
95
    normal sockets, socket from asyncio, StringIO, etc....
96
    """
97
98 1
    def __init__(self, sock):
99 1
        self.socket = sock
100
101 1
    def read(self, size):
102
        """
103
        Receive up to size bytes from socket
104
        """
105 1
        data = b''
106 1
        while size > 0:
107 1
            try:
108 1
                chunk = self.socket.recv(size)
109 1
            except (OSError, SocketError) as ex:
110 1
                raise SocketClosedException("Server socket has closed", ex)
111 1
            if not chunk:
112 1
                raise SocketClosedException("Server socket has closed")
113 1
            data += chunk
114 1
            size -= len(chunk)
115 1
        return data
116
117 1
    def write(self, data):
118 1
        self.socket.sendall(data)
119
120
121 1
def create_nonce(size=32):
122 1
    return os.urandom(size)
123
124
125 1
class ThreadLoop(threading.Thread):
126
    """
127
    run an asyncio loop in a thread
128
    """
129
130 1
    def __init__(self):
131 1
        threading.Thread.__init__(self)
132 1
        self.logger = logging.getLogger(__name__)
133 1
        self.loop = None
134 1
        self._cond = threading.Condition()
135
136 1
    def start(self):
137 1
        with self._cond:
138 1
            threading.Thread.start(self)
139 1
            self._cond.wait()
140
141 1
    def run(self):
142 1
        self.logger.debug("Starting subscription thread")
143 1
        self.loop = asyncio.new_event_loop()
144 1
        asyncio.set_event_loop(self.loop)
145 1
        with self._cond:
146 1
            self._cond.notify_all()
147 1
        self.loop.run_forever()
148 1
        self.logger.debug("subscription thread ended")
149
150 1
    def create_server(self, proto, hostname, port):
151 1
        return self.loop.create_server(proto, hostname, port)
152
153 1
    def stop(self):
154
        """
155
        stop subscription loop, thus the subscription thread
156
        """
157 1
        self.loop.call_soon_threadsafe(self.loop.stop)
158
159 1
    def call_soon(self, callback):
160 1
        self.loop.call_soon_threadsafe(callback)
161
162 1
    def call_later(self, delay, callback):
163
        """
164
        threadsafe call_later from asyncio
165
        """
166 1
        p = functools.partial(self.loop.call_later, delay, callback)
167 1
        self.loop.call_soon_threadsafe(p)
168
169 1
    def _create_task(self, future, coro, cb=None):
170
        #task = self.loop.create_task(coro)
171 1
        task = asyncio.async(coro, loop=self.loop) 
172 1
        if cb:
173 1
            task.add_done_callback(cb)
174 1
        future.set_result(task)
175
176 1
    def create_task(self, coro, cb=None):
177
        """
178
        threadsafe create_task from asyncio
179
        """
180 1
        future = Future()
181 1
        p = functools.partial(self._create_task, future, coro, cb)
182 1
        self.loop.call_soon_threadsafe(p)
183 1
        return future.result()
184
185 1
    def run_coro_and_wait(self, coro):
186 1
        cond = threading.Condition()
187 1
        def cb(_):
188 1
            with cond:
189 1
                cond.notify_all()
190 1
        with cond:
191 1
            task = self.create_task(coro, cb)
192 1
            cond.wait()
193 1
        return task.result()
194
195 1
    def _run_until_complete(self, future, coro):
196
        task = self.loop.run_until_complete(coro)
197
        future.set_result(task)
198
199 1
    def run_until_complete(self, coro):
200
        """
201
        threadsafe run_until_completed from asyncio
202
        """
203
        future = Future()
204
        p = functools.partial(self._run_until_complete, future, coro)
205
        self.loop.call_soon_threadsafe(p)
206
        return future.result()
207
208
209
210