Completed
Push — master ( 5d9c38...b34dc2 )
by Olivier
03:41 queued 01:09
created

opcua.Buffer.__len__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
try:
7
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
8 1
    import asyncio
9
except ImportError:
10
    import trollius as asyncio
11
    from trollius import From
12
13 1
class ServiceError(Exception):
14 1
    def __init__(self, code):
15
        super(ServiceError, self).__init__('UA service error')
16
        self.code = code
17
18 1
class NotEnoughData(Exception):
19 1
    pass
20
21 1
class Buffer(object):
22
23
    """
24
    alternative to io.BytesIO making debug easier
25
    and added a few conveniance methods
26
    """
27
28 1
    def __init__(self, data, start_pos=0, size=-1):
29
        # self.logger = logging.getLogger(__name__)
30 1
        self._data = data
31 1
        self._cur_pos = start_pos
32 1
        if size == -1:
33 1
            size = len(data) - start_pos
34 1
        self._size = size
35
36 1
    def __str__(self):
37
        return "Buffer(size:{}, data:{})".format(
38
            self._size,
39
            self._data[self._cur_pos:self._cur_pos + self._size])
40 1
    __repr__ = __str__
41
42 1
    def __len__(self):
43 1
        return self._size
44
45 1
    def read(self, size):
46
        """
47
        read and pop number of bytes for buffer
48
        """
49 1
        if size > self._size:
50
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
51
        # self.logger.debug("Request for %s bytes, from %s", size, self)
52 1
        self._size -= size
53 1
        pos = self._cur_pos
54 1
        self._cur_pos += size
55 1
        data = self._data[pos:self._cur_pos]
56
        # self.logger.debug("Returning: %s ", data)
57 1
        return data
58
59 1
    def copy(self, size=-1):
60
        """
61
        return a shadow copy, optionnaly only copy 'size' bytes
62
        """
63 1
        if size == -1 or size > self._size:
64 1
            size = self._size
65 1
        return Buffer(self._data, self._cur_pos, size)
66
67 1
    def skip(self, size):
68
        """
69
        skip size bytes in buffer
70
        """
71 1
        if size > self._size:
72
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
73 1
        self._size -= size
74 1
        self._cur_pos += size
75
76
77
78 1
class SocketClosedException(Exception):
79 1
    pass
80
81 1
class SocketWrapper(object):
82
    """
83
    wrapper to make it possible to have same api for 
84
    normal sockets, socket from asyncio, StringIO, etc....
85
    """
86 1
    def __init__(self, sock):
87 1
        self.socket = sock
88
89 1
    def read(self, size):
90
        """
91
        Receive up to size bytes from socket
92
        """
93 1
        data = b''
94 1
        while size > 0:
95 1
            chunk = self.socket.recv(size)
96 1
            if not chunk:
97 1
                raise SocketClosedException("Server socket has closed")
98 1
            data += chunk
99 1
            size -= len(chunk)
100 1
        return data
101
102 1
    def write(self, data):
103 1
        self.socket.sendall(data)
104
105
106
107 1
def create_nonce(size=32):
108 1
    return os.urandom(size)
109
110
111
112
113 1
class ThreadLoop(threading.Thread):
114
    """
115
    run an asyncio loop in a thread
116
    """
117
118 1
    def __init__(self):
119 1
        threading.Thread.__init__(self)
120 1
        self.logger = logging.getLogger(__name__)
121 1
        self.loop = None
122 1
        self._cond = threading.Condition()
123
124 1
    def start(self):
125 1
        with self._cond:
126 1
            threading.Thread.start(self)
127 1
            self._cond.wait()
128
129 1
    def run(self):
130 1
        self.logger.debug("Starting subscription thread")
131 1
        self.loop = asyncio.new_event_loop()
132 1
        asyncio.set_event_loop(self.loop)
133 1
        with self._cond:
134 1
            self._cond.notify_all()
135 1
        self.loop.run_forever()
136 1
        self.logger.debug("subscription thread ended")
137
138 1
    def create_server(self, proto, hostname, port):
139 1
        return self.loop.create_server(proto, hostname, port)
140
141 1
    def stop(self):
142
        """
143
        stop subscription loop, thus the subscription thread
144
        """
145 1
        self.loop.call_soon_threadsafe(self.loop.stop)
146
147 1
    def call_soon(self, callback):
148 1
        self.loop.call_soon_threadsafe(callback)
149
150 1
    def call_later(self, delay, callback):
151
        """
152
        threadsafe call_later from asyncio
153
        """
154 1
        p = functools.partial(self.loop.call_later, delay, callback)
155 1
        self.loop.call_soon_threadsafe(p)
156
157 1
    def _create_task(self, future, coro, cb=None):
158
        #task = self.loop.create_task(coro)
159 1
        task = asyncio.async(coro, loop=self.loop) 
160 1
        if cb:
161 1
            task.add_done_callback(cb)
162 1
        future.set_result(task)
163
164 1
    def create_task(self, coro, cb=None):
165
        """
166
        threadsafe create_task from asyncio
167
        """
168 1
        future = Future()
169 1
        p = functools.partial(self._create_task, future, coro, cb)
170 1
        self.loop.call_soon_threadsafe(p)
171 1
        return future.result()
172
173 1
    def run_coro_and_wait(self, coro):
174 1
        cond = threading.Condition()
175 1
        def cb(_):
176 1
            with cond:
177 1
                cond.notify_all()
178 1
        with cond:
179 1
            task = self.create_task(coro, cb)
180 1
            cond.wait()
181 1
        return task.result()
182
183 1
    def _run_until_complete(self, future, coro):
184
        task = self.loop.run_until_complete(coro)
185
        future.set_result(task)
186
187 1
    def run_until_complete(self, coro):
188
        """
189
        threadsafe run_until_completed from asyncio
190
        """
191
        future = Future()
192
        p = functools.partial(self._run_until_complete, future, coro)
193
        self.loop.call_soon_threadsafe(p)
194
        return future.result()
195
196
197
198