Passed
Pull Request — master (#372)
by
unknown
02:19
created

asyncua.client.ua_file   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 37
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 37
rs 10
c 0
b 0
f 0
wmc 5

5 Methods

Rating   Name   Duplication   Size   Complexity  
A UaFile._open_file() 0 5 1
A UaFile.read_file() 0 13 1
A UaFile._get_file_size() 0 4 1
A UaFile._close_file() 0 5 1
A UaFile.__init__() 0 2 1
1
from asyncua import ua
2
3
class UaFile:
4
    def __init__(self, file_node):
5
        self._file_node = file_node
6
7
    async def read_file(self):
8
        """
9
        :param file_node: node of file object (e.g. node = client.get_node("ns=2;s=nameOfNode")
10
        """
11
        handle = await self._open_file(ua.OpenFileMode.Read.value)
12
        size = await self._get_file_size()
13
14
        read_node = await self._file_node.get_child("Read")
15
        arg1 = ua.Variant(handle, ua.VariantType.UInt32)
16
        arg2 = ua.Variant(size, ua.VariantType.Int32)
17
        contents = await self._file_node.call_method(read_node, arg1, arg2)
18
        await self._close_file(handle)
19
        return contents
20
21
    async def _open_file(self, open_mode):
22
        """ open file method """
23
        open_node = await self._file_node.get_child("Open")
24
        arg = ua.Variant(open_mode, ua.VariantType.Byte)
25
        return await self._file_node.call_method(open_node, arg)
26
27
    async def _get_file_size(self):
28
        """ gets size of file """
29
        size_node = await self._file_node.get_child("Size")
30
        return await size_node.read_value()
31
32
    async def _close_file(self, handle):
33
        """ close file method """
34
        read_node = await self._file_node.get_child("Close")
35
        arg1 = ua.Variant(handle, ua.VariantType.UInt32)
36
        return await self._file_node.call_method(read_node, arg1)
37