Completed
Push — dev ( 3d1b3a...abb445 )
by Olivier
02:17
created

opcua.Argument   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 5
Duplicated Lines 0 %

Test Coverage

Coverage 100%
Metric Value
wmc 1
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 10

1 Method

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 3 1
1 1
import struct
2 1
import logging
3
4 1
import opcua.uaprotocol_auto as auto
5 1
import opcua.uatypes as uatypes
6 1
from opcua.uatypes import uatype_UInt32
7 1
import opcua.utils as utils
8 1
from opcua.object_ids import ObjectIds
9 1
from opcua.attribute_ids import AttributeIds
10
11 1
logger = logging.getLogger('opcua.uaprotocol')
12
13 1
OPC_TCP_SCHEME = 'opc.tcp'
14
15 1
class AccessLevelMask(object):
16
    """
17
    used by AccessLevel and UserAccessLevel
18
    """
19 1
    CurrentRead = 0
20 1
    CurrentWrite = 1
21 1
    HistoryRead = 2
22 1
    HistoryWrite = 3
23 1
    SemanticChange = 4
24
25
26 1
class Hello(uatypes.FrozenClass):
27
28 1
    def __init__(self):
29 1
        self.ProtocolVersion = 0
30 1
        self.ReceiveBufferSize = 65536
31 1
        self.SendBufferSize = 65536
32 1
        self.MaxMessageSize = 0
33 1
        self.MaxChunkCount = 0
34 1
        self.EndpointUrl = ""
35 1
        self._freeze()
36
37 1
    def to_binary(self):
38 1
        b = []
39 1
        b.append(uatype_UInt32.pack(self.ProtocolVersion))
40 1
        b.append(uatype_UInt32.pack(self.ReceiveBufferSize))
41 1
        b.append(uatype_UInt32.pack(self.SendBufferSize))
42 1
        b.append(uatype_UInt32.pack(self.MaxMessageSize))
43 1
        b.append(uatype_UInt32.pack(self.MaxChunkCount))
44 1
        b.append(uatypes.pack_string(self.EndpointUrl))
45 1
        return b"".join(b)
46
47 1
    @staticmethod
48
    def from_binary(data):
49 1
        hello = Hello()
50 1
        hello.ProtocolVersion = uatype_UInt32.unpack(data.read(4))[0]
51 1
        hello.ReceiveBufferSize = uatype_UInt32.unpack(data.read(4))[0]
52 1
        hello.SendBufferSize = uatype_UInt32.unpack(data.read(4))[0]
53 1
        hello.MaxMessageSize = uatype_UInt32.unpack(data.read(4))[0]
54 1
        hello.MaxChunkCount = uatype_UInt32.unpack(data.read(4))[0]
55 1
        hello.EndpointUrl = uatypes.unpack_string(data)
56 1
        return hello
57
58
59 1
class MessageType(object):
60 1
    Invalid = b"INV"  # FIXME: check value
61 1
    Hello = b"HEL"
62 1
    Acknowledge = b"ACK"
63 1
    Error = b"ERR"
64 1
    SecureOpen = b"OPN"
65 1
    SecureClose = b"CLO"
66 1
    SecureMessage = b"MSG"
67
68
69 1
class ChunkType(object):
70 1
    Invalid = b"0"  # FIXME check
71 1
    Single = b"F"
72 1
    Intermediate = b"C"
73 1
    Final = b"A"
74
75
76 1
class Header(uatypes.FrozenClass):
77
78 1
    def __init__(self, msgType=None, chunkType=None, channelid=0):
79 1
        self.MessageType = msgType
80 1
        self.ChunkType = chunkType
81 1
        self.ChannelId = channelid
82 1
        self.body_size = 0
83 1
        self.packet_size = 0
84 1
        self._freeze()
85
86 1
    def add_size(self, size):
87 1
        self.body_size += size
88
89 1
    def to_binary(self):
90 1
        b = []
91 1
        b.append(struct.pack("<3ss", self.MessageType, self.ChunkType))
92 1
        size = self.body_size + 8
93 1
        if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
94 1
            size += 4
95 1
        b.append(uatype_UInt32.pack(size))
96 1
        if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
97 1
            b.append(uatype_UInt32.pack(self.ChannelId))
98 1
        return b"".join(b)
99
100 1
    @staticmethod
101
    def from_string(data):
102 1
        hdr = Header()
103 1
        hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
104 1
        hdr.body_size = hdr.packet_size - 8
105 1
        if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
106 1
            hdr.body_size -= 4
107 1
            hdr.ChannelId = uatype_UInt32.unpack(data.read(4))[0]
108 1
        return hdr
109
110 1
    def __str__(self):
111
        return "Header(type:{}, chunk_type:{}, body_size:{}, channel:{})".format(self.MessageType, self.ChunkType, self.body_size, self.ChannelId)
112 1
    __repr__ = __str__
113
114
115 1
class ErrorMessage(uatypes.FrozenClass):
116
117 1
    def __init__(self):
118
        self.Error = uatypes.StatusCode()
119
        self.Reason = ""
120
        self._freeze()
121
122 1
    def to_binary(self):
123
        b = []
124
        b.append(self.Error.to_binary())
125
        b.append(uatypes.pack_string(self.Reason))
126
        return b"".join(b)
127
128 1
    @staticmethod
129
    def from_binary(data):
130
        ack = ErrorMessage()
131
        ack.Error = uatypes.StatusCode.from_binary(data)
132
        ack.Reason = uatypes.unpack_string(data)
133
        return ack
134
135 1
    def __str__(self):
136
        return "MessageAbort(error:{}, reason:{})".format(self.Error, self.Reason)
137 1
    __repr__ = __str__
138
139
140 1
class Acknowledge(uatypes.FrozenClass):
141
142 1
    def __init__(self):
143 1
        self.ProtocolVersion = 0
144 1
        self.ReceiveBufferSize = 65536
145 1
        self.SendBufferSize = 65536
146 1
        self.MaxMessageSize = 0  # No limits
147 1
        self.MaxChunkCount = 0  # No limits
148 1
        self._freeze()
149
150 1
    def to_binary(self):
151 1
        return struct.pack(
152
            "<5I",
153
            self.ProtocolVersion,
154
            self.ReceiveBufferSize,
155
            self.SendBufferSize,
156
            self.MaxMessageSize,
157
            self.MaxChunkCount)
158
159 1
    @staticmethod
160
    def from_binary(data):
161 1
        ack = Acknowledge()
162 1
        ack.ProtocolVersion, ack.ReceiveBufferSize, ack.SendBufferSize, ack.MaxMessageSize, ack.MaxChunkCount \
163
            = struct.unpack("<5I", data.read(20))
164 1
        return ack
165
166
167 1
class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
168
169 1
    def __init__(self):
170 1
        self.SecurityPolicyURI = "http://opcfoundation.org/UA/SecurityPolicy#None"
171 1
        self.SenderCertificate = b""
172 1
        self.ReceiverCertificateThumbPrint = b""
173 1
        self._freeze()
174
175 1
    def to_binary(self):
176 1
        b = []
177 1
        b.append(uatypes.pack_string(self.SecurityPolicyURI))
178 1
        b.append(uatypes.pack_string(self.SenderCertificate))
179 1
        b.append(uatypes.pack_string(self.ReceiverCertificateThumbPrint))
180 1
        return b"".join(b)
181
182 1
    @staticmethod
183
    def from_binary(data):
184 1
        hdr = AsymmetricAlgorithmHeader()
185 1
        hdr.SecurityPolicyURI = uatypes.unpack_bytes(data)
186 1
        hdr.SenderCertificate = uatypes.unpack_bytes(data)
187 1
        hdr.ReceiverCertificateThumbPrint = uatypes.unpack_bytes(data)
188 1
        return hdr
189
190 1
    def __str__(self):
191
        return "{}(SecurytyPolicy:{}, certificatesize:{}, receiverCertificatesize:{} )".format(self.__class__.__name__, self.SecurityPolicyURI, len(self.SenderCertificate), len(self.ReceiverCertificateThumbPrint))
192 1
    __repr__ = __str__
193
194
195 1
class SymmetricAlgorithmHeader(uatypes.FrozenClass):
196
197 1
    def __init__(self):
198 1
        self.TokenId = 0
199 1
        self._freeze()
200
201 1
    @staticmethod
202
    def from_binary(data):
203 1
        obj = SymmetricAlgorithmHeader()
204 1
        obj.TokenId = uatype_UInt32.unpack(data.read(4))[0]
205 1
        return obj
206
207 1
    def to_binary(self):
208 1
        return uatype_UInt32.pack(self.TokenId)
209
210 1
    def __str__(self):
211
        return "{}(TokenId:{} )".format(self.__class__.__name__, self.TokenId)
212 1
    __repr__ = __str__
213
214
215 1
class SequenceHeader(uatypes.FrozenClass):
216
217 1
    def __init__(self):
218 1
        self.SequenceNumber = None
219 1
        self.RequestId = None
220 1
        self._freeze()
221
222 1
    @staticmethod
223
    def from_binary(data):
224 1
        obj = SequenceHeader()
225 1
        obj.SequenceNumber = uatype_UInt32.unpack(data.read(4))[0]
226 1
        obj.RequestId = uatype_UInt32.unpack(data.read(4))[0]
227 1
        return obj
228
229 1
    def to_binary(self):
230 1
        b = []
231 1
        b.append(uatype_UInt32.pack(self.SequenceNumber))
232 1
        b.append(uatype_UInt32.pack(self.RequestId))
233 1
        return b"".join(b)
234
235 1
    def __str__(self):
236
        return "{}(SequenceNumber:{}, RequestId:{} )".format(self.__class__.__name__, self.SequenceNumber, self.RequestId)
237 1
    __repr__ = __str__
238
239
# FIXES for missing switchfield in NodeAttributes classes
240 1
ana = auto.NodeAttributesMask
241
242
243 1
class ObjectAttributes(auto.ObjectAttributes):
244
245 1
    def __init__(self):
246 1
        auto.ObjectAttributes.__init__(self)
247 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.EventNotifier
248
249
250 1
class ObjectTypeAttributes(auto.ObjectTypeAttributes):
251
252 1
    def __init__(self):
253 1
        auto.ObjectTypeAttributes.__init__(self)
254 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
255
256
257 1
class VariableAttributes(auto.VariableAttributes):
258
259 1
    def __init__(self):
260 1
        auto.VariableAttributes.__init__(self)
261 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval | ana.Historizing
262 1
        self.Historizing = False
263
264
265 1
class VariableTypeAttributes(auto.VariableTypeAttributes):
266
267 1
    def __init__(self):
268 1
        auto.VariableTypeAttributes.__init__(self)
269 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.IsAbstract
270
271
272 1
class MethodAttributes(auto.MethodAttributes):
273
274 1
    def __init__(self):
275 1
        auto.MethodAttributes.__init__(self)
276 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Executable | ana.UserExecutable
277
278
279 1
class ReferenceTypeAttributes(auto.ReferenceTypeAttributes):
280
281 1
    def __init__(self):
282 1
        auto.ReferenceTypeAttributes.__init__(self)
283 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract | ana.Symmetric | ana.InverseName
284
285
286 1
class DataTypeAttributes(auto.DataTypeAttributes):
287
288 1
    def __init__(self):
289 1
        auto.DataTypeAttributes.__init__(self)
290 1
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
291
292
293 1
class ViewAttributes(auto.ViewAttributes):
294
295 1
    def __init__(self):
296
        auto.ViewAttributes.__init__(self)
297
        self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.ContainsNoLoops | ana.EventNotifier
298
299
300 1
class Argument(auto.Argument):
301
302 1
    def __init__(self):
303 1
        auto.Argument.__init__(self)
304 1
        self.ValueRank = -2
305
306
307
AttributeIdsInv = {v: k for k, v in AttributeIds.__dict__.items()}
308