Completed
Push — master ( 510f7f...5d9c38 )
by Olivier
02:07
created

opcua.Buffer.skip()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.048
Metric Value
dl 0
loc 8
ccs 1
cts 5
cp 0.2
rs 9.4286
cc 2
crap 4.048
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
try:
7
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
8 1
    import asyncio
9
except ImportError:
10
    import trollius as asyncio
11
    from trollius import From
12
13 1
class ServiceError(Exception):
14 1
    def __init__(self, code):
15
        super(ServiceError, self).__init__('UA service error')
16
        self.code = code
17
18 1
class NotEnoughData(Exception):
19 1
    pass
20
21 1
class Buffer(object):
22
23
    """
24
    alternative to io.BytesIO making debug easier
25
    and added a few conveniance methods
26
    """
27
28 1
    def __init__(self, data, start_pos=0, size=-1):
29
        # self.logger = logging.getLogger(__name__)
30 1
        self._data = data
31 1
        self._cur_pos = start_pos
32
        if size == -1:
33 1
            size = len(data) - start_pos
34
        self._size = size
35 1
36
    def __str__(self):
37 1
        return "Buffer(size:{}, data:{})".format(
38 1
            self._size,
39
            self._data[self._cur_pos:self._cur_pos + self._size])
40 1
    __repr__ = __str__
41
42
    def __len__(self):
43
        return self._size
44 1
45 1
    def read(self, size):
46 1
        """
47 1
        read and pop number of bytes for buffer
48
        """
49
        if size > self._size:
50 1
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
51 1
        # self.logger.debug("Request for %s bytes, from %s", size, self)
52
        self._size -= size
53 1
        pos = self._cur_pos
54
        self._cur_pos += size
55 1
        data = self._data[pos:self._cur_pos]
56
        # self.logger.debug("Returning: %s ", data)
57
        return data
58
59 1
    def copy(self, size=-1):
60
        """
61
        return a shadow copy, optionnaly only copy 'size' bytes
62 1
        """
63
        if size == -1 or size > self._size:
64 1
            size = self._size
65
        return Buffer(self._data, self._cur_pos, size)
66
67
    def skip(self, size):
68
        """
69
        skip size bytes in buffer
70
        """
71
        if size > self._size:
72 1
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
73
        self._size -= size
74
        self._cur_pos += size
75 1
76
77
78
class SocketClosedException(Exception):
79 1
    pass
80
81
class SocketWrapper(object):
82 1
    """
83 1
    wrapper to make it possible to have same api for 
84
    normal sockets, socket from asyncio, StringIO, etc....
85 1
    """
86
    def __init__(self, sock):
87
        self.socket = sock
88
89
    def read(self, size):
90 1
        """
91 1
        Receive up to size bytes from socket
92
        """
93 1
        data = b''
94
        while size > 0:
95
            chunk = self.socket.recv(size)
96
            if not chunk:
97 1
                raise SocketClosedException("Server socket has closed")
98 1
            data += chunk
99 1
            size -= len(chunk)
100 1
        return data
101 1
102 1
    def write(self, data):
103 1
        self.socket.sendall(data)
104 1
105
106 1
107 1
def create_nonce(size=32):
108
    return os.urandom(size)
109
110
111 1
112 1
113
class ThreadLoop(threading.Thread):
114
    """
115
    run an asyncio loop in a thread
116
    """
117 1
118
    def __init__(self):
119
        threading.Thread.__init__(self)
120
        self.logger = logging.getLogger(__name__)
121
        self.loop = None
122 1
        self._cond = threading.Condition()
123 1
124 1
    def start(self):
125 1
        with self._cond:
126 1
            threading.Thread.start(self)
127
            self._cond.wait()
128 1
129 1
    def run(self):
130 1
        self.logger.debug("Starting subscription thread")
131 1
        self.loop = asyncio.new_event_loop()
132
        asyncio.set_event_loop(self.loop)
133 1
        with self._cond:
134 1
            self._cond.notify_all()
135 1
        self.loop.run_forever()
136 1
        self.logger.debug("subscription thread ended")
137 1
138 1
    def create_server(self, proto, hostname, port):
139 1
        return self.loop.create_server(proto, hostname, port)
140 1
141
    def stop(self):
142 1
        """
143 1
        stop subscription loop, thus the subscription thread
144
        """
145 1
        self.loop.call_soon_threadsafe(self.loop.stop)
146
147
    def call_soon(self, callback):
148
        self.loop.call_soon_threadsafe(callback)
149 1
150
    def call_later(self, delay, callback):
151 1
        """
152 1
        threadsafe call_later from asyncio
153
        """
154 1
        p = functools.partial(self.loop.call_later, delay, callback)
155
        self.loop.call_soon_threadsafe(p)
156
157
    def _create_task(self, future, coro, cb=None):
158 1
        #task = self.loop.create_task(coro)
159 1
        task = asyncio.async(coro, loop=self.loop) 
160
        if cb:
161 1
            task.add_done_callback(cb)
162
        future.set_result(task)
163 1
164 1
    def create_task(self, coro, cb=None):
165 1
        """
166 1
        threadsafe create_task from asyncio
167
        """
168 1
        future = Future()
169
        p = functools.partial(self._create_task, future, coro, cb)
170
        self.loop.call_soon_threadsafe(p)
171
        return future.result()
172 1
173 1
    def run_coro_and_wait(self, coro):
174 1
        cond = threading.Condition()
175 1
        def cb(_):
176
            with cond:
177 1
                cond.notify_all()
178 1
        with cond:
179 1
            task = self.create_task(coro, cb)
180 1
            cond.wait()
181 1
        return task.result()
182 1
183 1
    def _run_until_complete(self, future, coro):
184 1
        task = self.loop.run_until_complete(coro)
185 1
        future.set_result(task)
186
187 1
    def run_until_complete(self, coro):
188
        """
189
        threadsafe run_until_completed from asyncio
190
        """
191 1
        future = Future()
192
        p = functools.partial(self._run_until_complete, future, coro)
193
        self.loop.call_soon_threadsafe(p)
194
        return future.result()
195
196
197
198