Completed
Push — master ( 305c00...fd61ee )
by Olivier
01:39
created

SocketWrapper   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 26
Duplicated Lines 0 %

Test Coverage

Coverage 88.24%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 0
loc 26
ccs 15
cts 17
cp 0.8824
rs 10
c 2
b 0
f 0
wmc 6

3 Methods

Rating   Name   Duplication   Size   Complexity  
A write() 0 2 1
A __init__() 0 2 1
A read() 0 15 4
1 1
import logging
2 1
import os
3 1
from concurrent.futures import Future
4 1
import functools
5 1
import threading
6 1
from socket import error as SocketError
7
8 1
try:
9
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
10 1
    import asyncio
11
except ImportError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ImportError does not seem to be defined.
Loading history...
12
    import trollius as asyncio
13
14
15 1
from opcua.common.uaerrors import UaError
16
17
18 1
class ServiceError(UaError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable UaError does not seem to be defined.
Loading history...
19 1
    def __init__(self, code):
20 1
        super(ServiceError, self).__init__('UA Service Error')
21 1
        self.code = code
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable code does not seem to be defined.
Loading history...
22
23
24 1
class NotEnoughData(UaError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable UaError does not seem to be defined.
Loading history...
25 1
    pass
26
27
28 1
class SocketClosedException(UaError):
29 1
    pass
30
31
32 1
class Buffer(object):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable object does not seem to be defined.
Loading history...
33
34
    """
35
    alternative to io.BytesIO making debug easier
36
    and added a few conveniance methods
37
    """
38
39 1
    def __init__(self, data, start_pos=0, size=-1):
40
        # self.logger = logging.getLogger(__name__)
41 1
        self._data = data
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable data does not seem to be defined.
Loading history...
42 1
        self._cur_pos = start_pos
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable start_pos does not seem to be defined.
Loading history...
43 1
        if size == -1:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable size does not seem to be defined.
Loading history...
44 1
            size = len(data) - start_pos
45 1
        self._size = size
46
47 1
    def __str__(self):
48
        return "Buffer(size:{}, data:{})".format(
49
            self._size,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
50
            self._data[self._cur_pos:self._cur_pos + self._size])
51 1
    __repr__ = __str__
52
53 1
    def __len__(self):
54 1
        return self._size
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
55
56 1
    def read(self, size):
57
        """
58
        read and pop number of bytes for buffer
59
        """
60 1
        if size > self._size:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable size does not seem to be defined.
Loading history...
61
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
62
        # self.logger.debug("Request for %s bytes, from %s", size, self)
63 1
        self._size -= size
64 1
        pos = self._cur_pos
65 1
        self._cur_pos += size
66 1
        data = self._data[pos:self._cur_pos]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable pos does not seem to be defined.
Loading history...
67
        # self.logger.debug("Returning: %s ", data)
68 1
        return data
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable data does not seem to be defined.
Loading history...
69
70 1
    def copy(self, size=-1):
71
        """
72
        return a shadow copy, optionnaly only copy 'size' bytes
73
        """
74 1
        if size == -1 or size > self._size:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable size does not seem to be defined.
Loading history...
75 1
            size = self._size
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
76 1
        return Buffer(self._data, self._cur_pos, size)
77
78 1
    def skip(self, size):
79
        """
80
        skip size bytes in buffer
81
        """
82 1
        if size > self._size:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable size does not seem to be defined.
Loading history...
83
            raise NotEnoughData("Not enough data left in buffer, request for {}, we have {}".format(size, self))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
84 1
        self._size -= size
85 1
        self._cur_pos += size
86
87
88 1
class SocketWrapper(object):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable object does not seem to be defined.
Loading history...
89
    """
90
    wrapper to make it possible to have same api for 
91
    normal sockets, socket from asyncio, StringIO, etc....
92
    """
93 1
    def __init__(self, sock):
94 1
        self.socket = sock
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sock does not seem to be defined.
Loading history...
95
96 1
    def read(self, size):
97
        """
98
        Receive up to size bytes from socket
99
        """
100 1
        data = b''
101 1
        while size > 0:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable size does not seem to be defined.
Loading history...
102 1
            try:
103 1
                chunk = self.socket.recv(size)
104
            except (OSError, SocketError) as ex:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable OSError does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable SocketError does not seem to be defined.
Loading history...
105
                raise SocketClosedException("Server socket has closed", ex)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ex does not seem to be defined.
Loading history...
106 1
            if not chunk:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable chunk does not seem to be defined.
Loading history...
107 1
                raise SocketClosedException("Server socket has closed")
108 1
            data += chunk
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable data does not seem to be defined.
Loading history...
109 1
            size -= len(chunk)
110 1
        return data
111
112 1
    def write(self, data):
113 1
        self.socket.sendall(data)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable data does not seem to be defined.
Loading history...
114
115
116 1
def create_nonce(size=32):
117 1
    return os.urandom(size)
118
119
120 1
class ThreadLoop(threading.Thread):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable threading does not seem to be defined.
Loading history...
121
    """
122
    run an asyncio loop in a thread
123
    """
124
125 1
    def __init__(self):
126 1
        threading.Thread.__init__(self)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
127 1
        self.logger = logging.getLogger(__name__)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __name__ does not seem to be defined.
Loading history...
128 1
        self.loop = None
129 1
        self._cond = threading.Condition()
130
131 1
    def start(self):
132 1
        with self._cond:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
133 1
            threading.Thread.start(self)
134 1
            self._cond.wait()
135
136 1
    def run(self):
137 1
        self.logger.debug("Starting subscription thread")
138 1
        self.loop = asyncio.new_event_loop()
139 1
        asyncio.set_event_loop(self.loop)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
140 1
        with self._cond:
141 1
            self._cond.notify_all()
142 1
        self.loop.run_forever()
143 1
        self.logger.debug("subscription thread ended")
144
145 1
    def create_server(self, proto, hostname, port):
146 1
        return self.loop.create_server(proto, hostname, port)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable proto does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable hostname does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable port does not seem to be defined.
Loading history...
147
148 1
    def stop(self):
149
        """
150
        stop subscription loop, thus the subscription thread
151
        """
152 1
        self.loop.call_soon_threadsafe(self.loop.stop)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
153
154 1
    def call_soon(self, callback):
155 1
        self.loop.call_soon_threadsafe(callback)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable callback does not seem to be defined.
Loading history...
156
157 1
    def call_later(self, delay, callback):
158
        """
159
        threadsafe call_later from asyncio
160
        """
161 1
        p = functools.partial(self.loop.call_later, delay, callback)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable delay does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable callback does not seem to be defined.
Loading history...
162 1
        self.loop.call_soon_threadsafe(p)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable p does not seem to be defined.
Loading history...
163
164 1
    def _create_task(self, future, coro, cb=None):
165
        #task = self.loop.create_task(coro)
166 1
        task = asyncio.async(coro, loop=self.loop) 
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable coro does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
167 1
        if cb:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cb does not seem to be defined.
Loading history...
168 1
            task.add_done_callback(cb)
169 1
        future.set_result(task)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable task does not seem to be defined.
Loading history...
170
171 1
    def create_task(self, coro, cb=None):
172
        """
173
        threadsafe create_task from asyncio
174
        """
175 1
        future = Future()
176 1
        p = functools.partial(self._create_task, future, coro, cb)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cb does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable coro does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable future does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
177 1
        self.loop.call_soon_threadsafe(p)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable p does not seem to be defined.
Loading history...
178 1
        return future.result()
179
180 1
    def run_coro_and_wait(self, coro):
181 1
        cond = threading.Condition()
182 1
        def cb(_):
183 1
            with cond:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable cond does not seem to be defined.
Loading history...
184 1
                cond.notify_all()
185 1
        with cond:
186 1
            task = self.create_task(coro, cb)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable coro does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable cb does not seem to be defined.
Loading history...
187 1
            cond.wait()
188 1
        return task.result()
189
190 1
    def _run_until_complete(self, future, coro):
191
        task = self.loop.run_until_complete(coro)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable coro does not seem to be defined.
Loading history...
192
        future.set_result(task)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable task does not seem to be defined.
Loading history...
193
194 1
    def run_until_complete(self, coro):
195
        """
196
        threadsafe run_until_completed from asyncio
197
        """
198
        future = Future()
199
        p = functools.partial(self._run_until_complete, future, coro)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable future does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable coro does not seem to be defined.
Loading history...
200
        self.loop.call_soon_threadsafe(p)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable p does not seem to be defined.
Loading history...
201
        return future.result()
202
203
204
205