Passed
Pull Request — master (#47)
by
unknown
02:39
created

asyncua.common.utils.Buffer.__bytes__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
Helper function and classes that do not rely on asyncua library.
3
Helper function and classes depending on ua object are in ua_utils.py
4
"""
5
6
import os
7
import logging
8
from ..ua.uaerrors import UaError
9
10
_logger = logging.getLogger(__name__)
11
12
13
class ServiceError(UaError):
14
    def __init__(self, code):
15
        super(ServiceError, self).__init__('UA Service Error')
16
        self.code = code
17
18
19
class NotEnoughData(UaError):
20
    pass
21
22
23
class SocketClosedException(UaError):
24
    pass
25
26
27
class Buffer:
28
    """
29
    Alternative to io.BytesIO making debug easier
30
    and added a few convenience methods.
31
    """
32
33
    def __init__(self, data, start_pos=0, size=-1):
34
        self._data = data
35
        self._cur_pos = start_pos
36
        if size == -1:
37
            size = len(data) - start_pos
38
        self._size = size
39
40
    def __str__(self):
41
        return f"Buffer(size:{self._size}, data:{self._data[self._cur_pos:self._cur_pos + self._size]})"
42
    __repr__ = __str__
43
44
    def __len__(self):
45
        return self._size
46
47
    def __bytes__(self):
48
        """Return remains of buffer as bytes."""
49
        return self._data[self._cur_pos:]
50
51
    def read(self, size):
52
        """
53
        read and pop number of bytes for buffer
54
        """
55
        if size > self._size:
56
            raise NotEnoughData(f"Not enough data left in buffer, request for {size}, we have {self._size}")
57
        self._size -= size
58
        pos = self._cur_pos
59
        self._cur_pos += size
60
        return self._data[pos:self._cur_pos]
61
62
    def copy(self, size=-1):
63
        """
64
        return a shadow copy, optionally only copy 'size' bytes
65
        """
66
        if size == -1 or size > self._size:
67
            size = self._size
68
        return Buffer(self._data, self._cur_pos, size)
69
70
    def skip(self, size):
71
        """
72
        skip size bytes in buffer
73
        """
74
        if size > self._size:
75
            raise NotEnoughData(f"Not enough data left in buffer, request for {size}, we have {self._size}")
76
        self._size -= size
77
        self._cur_pos += size
78
79
80
def create_nonce(size=32):
81
    return os.urandom(size)
82