Completed
Push — dev ( 3d1b3a...abb445 )
by Olivier
02:17
created

opcua.Buffer.__len__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1 1
import logging
2 1
import uuid
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
try:
7
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
8 1
    import asyncio
9
except ImportError:
10
    import trollius as asyncio
11
    from trollius import From
12
13 1
class ServiceError(Exception):
14 1
    def __init__(self, code):
15
        super(ServiceError, self).__init__('UA service error')
16
        self.code = code
17
18 1
class NotEnoughData(Exception):
19 1
    pass
20
21 1
class Buffer(object):
22
23
    """
24
    alternative to io.BytesIO making debug easier
25
    and added a few conveniance methods
26
    """
27
28 1
    def __init__(self, data):
29
        # self.logger = logging.getLogger(__name__)
30 1
        self._data = data
31 1
        self.rsize = 0
32
33 1
    def __str__(self):
34
        return "Buffer(size:{}, data:{})".format(len(self), self.data)
35 1
    __repr__ = __str__
36
37 1
    def __len__(self):
38 1
        return len(self._data) - self.rsize
39
40 1
    def read(self, size):
41
        """
42
        read and pop number of bytes for buffer
43
        """
44 1
        rsize = self.rsize
45 1
        nrsize = rsize + size
46 1
        mydata = self._data
47 1
        if nrsize > len(mydata):
48
            raise Exception("Not enough data left in buffer, request for {}, we have {}".format(size, self))
49
        #self.logger.debug("Request for %s bytes, from %s", size, self)
50 1
        data = mydata[rsize:nrsize]
51 1
        self.rsize = nrsize
52
        #self.logger.debug("Returning: %s ", data)
53 1
        return data
54
55 1
    def copy(self, size=None):
56
        """
57
        return a copy, optionnaly only copy 'size' bytes
58
        """
59 1
        if size is None:
60
            return Buffer(self._data[self.rsize:])
61
        else:
62 1
            return Buffer(self._data[self.rsize:self.rsize + size])
63
64 1
    def test_read(self, size):
65
        """
66
        read 'size' bytes from buffer, without removing them from buffer
67
        """
68
        if self.rsize + size > len(self._data):
69
            raise Exception("Not enough data left in buffer, request for {}, we have {}".format(size, self))
70
        return self._data[self.rsize:self.rsize + size]
71
72 1
    def get_data(self):
73 1
        return self._data[self.rsize:]
74
75 1
    def set_data(self, v):
76 1
        self._data = v
77 1
        self.rsize = 0
78
79 1
    data = property(get_data, set_data)
80
81
82 1
class SocketClosedException(Exception):
83 1
    pass
84
85 1
class SocketWrapper(object):
86
    """
87
    wrapper to make it possible to have same api for 
88
    normal sockets, socket from asyncio, StringIO, etc....
89
    """
90 1
    def __init__(self, sock):
91 1
        self.socket = sock
92
93 1
    def read(self, size):
94
        """
95
        Receive up to size bytes from socket
96
        """
97 1
        data = b''
98 1
        while size > 0:
99 1
            chunk = self.socket.recv(size)
100 1
            if not chunk:
101 1
                raise SocketClosedException("Server socket has closed")
102 1
            data += chunk
103 1
            size -= len(chunk)
104 1
        return data
105
106 1
    def write(self, data):
107 1
        self.socket.sendall(data)
108
109
110
111 1
def create_nonce():
112 1
    return uuid.uuid4().bytes + uuid.uuid4().bytes  # seems we need at least 32 bytes not 16 as python gives us...
113
114
115
116
117 1
class ThreadLoop(threading.Thread):
118
    """
119
    run an asyncio loop in a thread
120
    """
121
122 1
    def __init__(self):
123 1
        threading.Thread.__init__(self)
124 1
        self.logger = logging.getLogger(__name__)
125 1
        self.loop = None
126 1
        self._cond = threading.Condition()
127
128 1
    def start(self):
129 1
        with self._cond:
130 1
            threading.Thread.start(self)
131 1
            self._cond.wait()
132
133 1
    def run(self):
134 1
        self.logger.debug("Starting subscription thread")
135 1
        self.loop = asyncio.new_event_loop()
136 1
        asyncio.set_event_loop(self.loop)
137 1
        with self._cond:
138 1
            self._cond.notify_all()
139 1
        self.loop.run_forever()
140 1
        self.logger.debug("subscription thread ended")
141
142 1
    def create_server(self, proto, hostname, port):
143 1
        return self.loop.create_server(proto, hostname, port)
144
145 1
    def stop(self):
146
        """
147
        stop subscription loop, thus the subscription thread
148
        """
149 1
        self.loop.call_soon_threadsafe(self.loop.stop)
150
151 1
    def call_soon(self, callback):
152 1
        self.loop.call_soon_threadsafe(callback)
153
154 1
    def call_later(self, delay, callback):
155
        """
156
        threadsafe call_later from asyncio
157
        """
158 1
        p = functools.partial(self.loop.call_later, delay, callback)
159 1
        self.loop.call_soon_threadsafe(p)
160
161 1
    def _create_task(self, future, coro, cb=None):
162
        #task = self.loop.create_task(coro)
163 1
        task = asyncio.async(coro, loop=self.loop) 
164 1
        if cb:
165 1
            task.add_done_callback(cb)
166 1
        future.set_result(task)
167
168 1
    def create_task(self, coro, cb=None):
169
        """
170
        threadsafe create_task from asyncio
171
        """
172 1
        future = Future()
173 1
        p = functools.partial(self._create_task, future, coro, cb)
174 1
        self.loop.call_soon_threadsafe(p)
175 1
        return future.result()
176
177 1
    def run_coro_and_wait(self, coro):
178 1
        cond = threading.Condition()
179 1
        def cb(_):
180 1
            with cond:
181 1
                cond.notify_all()
182 1
        with cond:
183 1
            task = self.create_task(coro, cb)
184 1
            cond.wait()
185 1
        return task.result()
186
187 1
    def _run_until_complete(self, future, coro):
188
        task = self.loop.run_until_complete(coro)
189
        future.set_result(task)
190
191 1
    def run_until_complete(self, coro):
192
        """
193
        threadsafe run_until_completed from asyncio
194
        """
195
        future = Future()
196
        p = functools.partial(self._run_until_complete, future, coro)
197
        self.loop.call_soon_threadsafe(p)
198
        return future.result()
199
200
201
202