Code Duplication    Length = 80-82 lines in 2 locations

opcua/ua/uaprotocol_auto.py 2 locations

@@ 1996-2077 (lines=82) @@
1993
1994
    __repr__ = __str__
1995
1996
1997
class RegisteredServer(FrozenClass):
1998
    '''
1999
    The information required to register a server with a discovery server.
2000
2001
    :ivar ServerUri:
2002
    :vartype ServerUri: String
2003
    :ivar ProductUri:
2004
    :vartype ProductUri: String
2005
    :ivar ServerNames:
2006
    :vartype ServerNames: LocalizedText
2007
    :ivar ServerType:
2008
    :vartype ServerType: ApplicationType
2009
    :ivar GatewayServerUri:
2010
    :vartype GatewayServerUri: String
2011
    :ivar DiscoveryUrls:
2012
    :vartype DiscoveryUrls: String
2013
    :ivar SemaphoreFilePath:
2014
    :vartype SemaphoreFilePath: String
2015
    :ivar IsOnline:
2016
    :vartype IsOnline: Boolean
2017
    '''
2018
    def __init__(self, binary=None):
2019
        if binary is not None:
2020
            self._binary_init(binary)
2021
            self._freeze = True
2022
            return
2023
        self.ServerUri = None
2024
        self.ProductUri = None
2025
        self.ServerNames = []
2026
        self.ServerType = ApplicationType(0)
2027
        self.GatewayServerUri = None
2028
        self.DiscoveryUrls = []
2029
        self.SemaphoreFilePath = None
2030
        self.IsOnline = True
2031
        self._freeze = True
2032
2033
    def to_binary(self):
2034
        packet = []
2035
        packet.append(pack_string(self.ServerUri))
2036
        packet.append(pack_string(self.ProductUri))
2037
        packet.append(uatype_Int32.pack(len(self.ServerNames)))
2038
        for fieldname in self.ServerNames:
2039
            packet.append(fieldname.to_binary())
2040
        packet.append(uatype_UInt32.pack(self.ServerType.value))
2041
        packet.append(pack_string(self.GatewayServerUri))
2042
        packet.append(uatype_Int32.pack(len(self.DiscoveryUrls)))
2043
        for fieldname in self.DiscoveryUrls:
2044
            packet.append(pack_string(fieldname))
2045
        packet.append(pack_string(self.SemaphoreFilePath))
2046
        packet.append(uatype_Boolean.pack(self.IsOnline))
2047
        return b''.join(packet)
2048
2049
    @staticmethod
2050
    def from_binary(data):
2051
        return RegisteredServer(data)
2052
2053
    def _binary_init(self, data):
2054
        self.ServerUri = unpack_string(data)
2055
        self.ProductUri = unpack_string(data)
2056
        length = uatype_Int32.unpack(data.read(4))[0]
2057
        array = []
2058
        if length != -1:
2059
            for _ in range(0, length):
2060
                array.append(LocalizedText.from_binary(data))
2061
        self.ServerNames = array
2062
        self.ServerType = ApplicationType(uatype_UInt32.unpack(data.read(4))[0])
2063
        self.GatewayServerUri = unpack_string(data)
2064
        self.DiscoveryUrls = unpack_uatype_array('String', data)
2065
        self.SemaphoreFilePath = unpack_string(data)
2066
        self.IsOnline = uatype_Boolean.unpack(data.read(1))[0]
2067
2068
    def __str__(self):
2069
        return 'RegisteredServer(' + 'ServerUri:' + str(self.ServerUri) + ', ' + \
2070
               'ProductUri:' + str(self.ProductUri) + ', ' + \
2071
               'ServerNames:' + str(self.ServerNames) + ', ' + \
2072
               'ServerType:' + str(self.ServerType) + ', ' + \
2073
               'GatewayServerUri:' + str(self.GatewayServerUri) + ', ' + \
2074
               'DiscoveryUrls:' + str(self.DiscoveryUrls) + ', ' + \
2075
               'SemaphoreFilePath:' + str(self.SemaphoreFilePath) + ', ' + \
2076
               'IsOnline:' + str(self.IsOnline) + ')'
2077
2078
    __repr__ = __str__
2079
2080
@@ 1770-1849 (lines=80) @@
1767
1768
    __repr__ = __str__
1769
1770
1771
class EndpointDescription(FrozenClass):
1772
    '''
1773
    The description of a endpoint that can be used to access a server.
1774
1775
    :ivar EndpointUrl:
1776
    :vartype EndpointUrl: String
1777
    :ivar Server:
1778
    :vartype Server: ApplicationDescription
1779
    :ivar ServerCertificate:
1780
    :vartype ServerCertificate: ByteString
1781
    :ivar SecurityMode:
1782
    :vartype SecurityMode: MessageSecurityMode
1783
    :ivar SecurityPolicyUri:
1784
    :vartype SecurityPolicyUri: String
1785
    :ivar UserIdentityTokens:
1786
    :vartype UserIdentityTokens: UserTokenPolicy
1787
    :ivar TransportProfileUri:
1788
    :vartype TransportProfileUri: String
1789
    :ivar SecurityLevel:
1790
    :vartype SecurityLevel: Byte
1791
    '''
1792
    def __init__(self, binary=None):
1793
        if binary is not None:
1794
            self._binary_init(binary)
1795
            self._freeze = True
1796
            return
1797
        self.EndpointUrl = None
1798
        self.Server = ApplicationDescription()
1799
        self.ServerCertificate = None
1800
        self.SecurityMode = MessageSecurityMode(0)
1801
        self.SecurityPolicyUri = None
1802
        self.UserIdentityTokens = []
1803
        self.TransportProfileUri = None
1804
        self.SecurityLevel = 0
1805
        self._freeze = True
1806
1807
    def to_binary(self):
1808
        packet = []
1809
        packet.append(pack_string(self.EndpointUrl))
1810
        packet.append(self.Server.to_binary())
1811
        packet.append(pack_bytes(self.ServerCertificate))
1812
        packet.append(uatype_UInt32.pack(self.SecurityMode.value))
1813
        packet.append(pack_string(self.SecurityPolicyUri))
1814
        packet.append(uatype_Int32.pack(len(self.UserIdentityTokens)))
1815
        for fieldname in self.UserIdentityTokens:
1816
            packet.append(fieldname.to_binary())
1817
        packet.append(pack_string(self.TransportProfileUri))
1818
        packet.append(uatype_Byte.pack(self.SecurityLevel))
1819
        return b''.join(packet)
1820
1821
    @staticmethod
1822
    def from_binary(data):
1823
        return EndpointDescription(data)
1824
1825
    def _binary_init(self, data):
1826
        self.EndpointUrl = unpack_string(data)
1827
        self.Server = ApplicationDescription.from_binary(data)
1828
        self.ServerCertificate = unpack_bytes(data)
1829
        self.SecurityMode = MessageSecurityMode(uatype_UInt32.unpack(data.read(4))[0])
1830
        self.SecurityPolicyUri = unpack_string(data)
1831
        length = uatype_Int32.unpack(data.read(4))[0]
1832
        array = []
1833
        if length != -1:
1834
            for _ in range(0, length):
1835
                array.append(UserTokenPolicy.from_binary(data))
1836
        self.UserIdentityTokens = array
1837
        self.TransportProfileUri = unpack_string(data)
1838
        self.SecurityLevel = uatype_Byte.unpack(data.read(1))[0]
1839
1840
    def __str__(self):
1841
        return 'EndpointDescription(' + 'EndpointUrl:' + str(self.EndpointUrl) + ', ' + \
1842
               'Server:' + str(self.Server) + ', ' + \
1843
               'ServerCertificate:' + str(self.ServerCertificate) + ', ' + \
1844
               'SecurityMode:' + str(self.SecurityMode) + ', ' + \
1845
               'SecurityPolicyUri:' + str(self.SecurityPolicyUri) + ', ' + \
1846
               'UserIdentityTokens:' + str(self.UserIdentityTokens) + ', ' + \
1847
               'TransportProfileUri:' + str(self.TransportProfileUri) + ', ' + \
1848
               'SecurityLevel:' + str(self.SecurityLevel) + ')'
1849
1850
    __repr__ = __str__
1851
1852