Code Duplication    Length = 46-48 lines in 4 locations

opcua/ua/uaprotocol_auto.py 4 locations

@@ 2162-2209 (lines=48) @@
2159
        ('ServerType', 'ApplicationType'),
2160
        ('GatewayServerUri', 'String'),
2161
        ('DiscoveryUrls', 'ListOfString'),
2162
        ('SemaphoreFilePath', 'String'),
2163
        ('IsOnline', 'Boolean'),
2164
               ]
2165
2166
    def __init__(self):
2167
        self.ServerUri = None
2168
        self.ProductUri = None
2169
        self.ServerNames = []
2170
        self.ServerType = ApplicationType(0)
2171
        self.GatewayServerUri = None
2172
        self.DiscoveryUrls = []
2173
        self.SemaphoreFilePath = None
2174
        self.IsOnline = True
2175
        self._freeze = True
2176
2177
    def to_binary(self):
2178
        packet = []
2179
        packet.append(uabin.Primitives.String.pack(self.ServerUri))
2180
        packet.append(uabin.Primitives.String.pack(self.ProductUri))
2181
        packet.append(uabin.Primitives.Int32.pack(len(self.ServerNames)))
2182
        for fieldname in self.ServerNames:
2183
            packet.append(fieldname.to_binary())
2184
        packet.append(uabin.Primitives.UInt32.pack(self.ServerType.value))
2185
        packet.append(uabin.Primitives.String.pack(self.GatewayServerUri))
2186
        packet.append(uabin.Primitives.Int32.pack(len(self.DiscoveryUrls)))
2187
        for fieldname in self.DiscoveryUrls:
2188
            packet.append(uabin.Primitives.String.pack(fieldname))
2189
        packet.append(uabin.Primitives.String.pack(self.SemaphoreFilePath))
2190
        packet.append(uabin.Primitives.Boolean.pack(self.IsOnline))
2191
        return b''.join(packet)
2192
2193
    @staticmethod
2194
    def from_binary(data):
2195
        obj = RegisteredServer()
2196
        self.ServerUri = uabin.Primitives.String.unpack(data)
2197
        self.ProductUri = uabin.Primitives.String.unpack(data)
2198
        length = uabin.Primitives.Int32.unpack(data)
2199
        array = []
2200
        if length != -1:
2201
            for _ in range(0, length):
2202
                array.append(LocalizedText.from_binary(data))
2203
        obj.ServerNames = array
2204
        self.ServerType = ApplicationType(uabin.Primitives.UInt32.unpack(data))
2205
        self.GatewayServerUri = uabin.Primitives.String.unpack(data)
2206
        obj.DiscoveryUrls = uabin.Primitives.String.unpack_array(data)
2207
        self.SemaphoreFilePath = uabin.Primitives.String.unpack(data)
2208
        self.IsOnline = uabin.Primitives.Boolean.unpack(data)
2209
        return obj
2210
2211
    def __str__(self):
2212
        return 'RegisteredServer(' + 'ServerUri:' + str(self.ServerUri) + ', ' + \
@@ 1616-1663 (lines=48) @@
1613
        ('ServerCapabilities', 'ListOfString'),
1614
               ]
1615
1616
    def __init__(self):
1617
        self.RecordId = 0
1618
        self.ServerName = None
1619
        self.DiscoveryUrl = None
1620
        self.ServerCapabilities = []
1621
        self._freeze = True
1622
1623
    def to_binary(self):
1624
        packet = []
1625
        packet.append(uabin.Primitives.UInt32.pack(self.RecordId))
1626
        packet.append(uabin.Primitives.String.pack(self.ServerName))
1627
        packet.append(uabin.Primitives.String.pack(self.DiscoveryUrl))
1628
        packet.append(uabin.Primitives.Int32.pack(len(self.ServerCapabilities)))
1629
        for fieldname in self.ServerCapabilities:
1630
            packet.append(uabin.Primitives.String.pack(fieldname))
1631
        return b''.join(packet)
1632
1633
    @staticmethod
1634
    def from_binary(data):
1635
        obj = ServerOnNetwork()
1636
        self.RecordId = uabin.Primitives.UInt32.unpack(data)
1637
        self.ServerName = uabin.Primitives.String.unpack(data)
1638
        self.DiscoveryUrl = uabin.Primitives.String.unpack(data)
1639
        obj.ServerCapabilities = uabin.Primitives.String.unpack_array(data)
1640
        return obj
1641
1642
    def __str__(self):
1643
        return 'ServerOnNetwork(' + 'RecordId:' + str(self.RecordId) + ', ' + \
1644
               'ServerName:' + str(self.ServerName) + ', ' + \
1645
               'DiscoveryUrl:' + str(self.DiscoveryUrl) + ', ' + \
1646
               'ServerCapabilities:' + str(self.ServerCapabilities) + ')'
1647
1648
    __repr__ = __str__
1649
1650
1651
class FindServersOnNetworkParameters(FrozenClass):
1652
    '''
1653
    :ivar StartingRecordId:
1654
    :vartype StartingRecordId: UInt32
1655
    :ivar MaxRecordsToReturn:
1656
    :vartype MaxRecordsToReturn: UInt32
1657
    :ivar ServerCapabilityFilter:
1658
    :vartype ServerCapabilityFilter: String
1659
    '''
1660
1661
    ua_types = [
1662
        ('StartingRecordId', 'UInt32'),
1663
        ('MaxRecordsToReturn', 'UInt32'),
1664
        ('ServerCapabilityFilter', 'ListOfString'),
1665
               ]
1666
@@ 11338-11383 (lines=46) @@
11335
            for _ in range(0, length):
11336
                array.append(StatusCode.from_binary(data))
11337
        obj.RemoveResults = array
11338
        length = uabin.Primitives.Int32.unpack(data)
11339
        array = []
11340
        if length != -1:
11341
            for _ in range(0, length):
11342
                array.append(DiagnosticInfo.from_binary(data))
11343
        obj.RemoveDiagnosticInfos = array
11344
        return obj
11345
11346
    def __str__(self):
11347
        return 'SetTriggeringResult(' + 'AddResults:' + str(self.AddResults) + ', ' + \
11348
               'AddDiagnosticInfos:' + str(self.AddDiagnosticInfos) + ', ' + \
11349
               'RemoveResults:' + str(self.RemoveResults) + ', ' + \
11350
               'RemoveDiagnosticInfos:' + str(self.RemoveDiagnosticInfos) + ')'
11351
11352
    __repr__ = __str__
11353
11354
11355
class SetTriggeringResponse(FrozenClass):
11356
    '''
11357
    :ivar TypeId:
11358
    :vartype TypeId: NodeId
11359
    :ivar ResponseHeader:
11360
    :vartype ResponseHeader: ResponseHeader
11361
    :ivar Parameters:
11362
    :vartype Parameters: SetTriggeringResult
11363
    '''
11364
11365
    ua_types = [
11366
        ('TypeId', 'NodeId'),
11367
        ('ResponseHeader', 'ResponseHeader'),
11368
        ('Parameters', 'SetTriggeringResult'),
11369
               ]
11370
11371
    def __init__(self):
11372
        self.TypeId = FourByteNodeId(ObjectIds.SetTriggeringResponse_Encoding_DefaultBinary)
11373
        self.ResponseHeader = ResponseHeader()
11374
        self.Parameters = SetTriggeringResult()
11375
        self._freeze = True
11376
11377
    def to_binary(self):
11378
        packet = []
11379
        packet.append(self.TypeId.to_binary())
11380
        packet.append(self.ResponseHeader.to_binary())
11381
        packet.append(self.Parameters.to_binary())
11382
        return b''.join(packet)
11383
11384
    @staticmethod
11385
    def from_binary(data):
11386
        obj = SetTriggeringResponse()
@@ 1824-1869 (lines=46) @@
1821
        obj.ResponseHeader = ResponseHeader.from_binary(data)
1822
        obj.Parameters = FindServersOnNetworkResult.from_binary(data)
1823
        return obj
1824
1825
    def __str__(self):
1826
        return 'FindServersOnNetworkResponse(' + 'TypeId:' + str(self.TypeId) + ', ' + \
1827
               'ResponseHeader:' + str(self.ResponseHeader) + ', ' + \
1828
               'Parameters:' + str(self.Parameters) + ')'
1829
1830
    __repr__ = __str__
1831
1832
1833
class UserTokenPolicy(FrozenClass):
1834
    '''
1835
    Describes a user token that can be used with a server.
1836
1837
    :ivar PolicyId:
1838
    :vartype PolicyId: String
1839
    :ivar TokenType:
1840
    :vartype TokenType: UserTokenType
1841
    :ivar IssuedTokenType:
1842
    :vartype IssuedTokenType: String
1843
    :ivar IssuerEndpointUrl:
1844
    :vartype IssuerEndpointUrl: String
1845
    :ivar SecurityPolicyUri:
1846
    :vartype SecurityPolicyUri: String
1847
    '''
1848
1849
    ua_types = [
1850
        ('PolicyId', 'String'),
1851
        ('TokenType', 'UserTokenType'),
1852
        ('IssuedTokenType', 'String'),
1853
        ('IssuerEndpointUrl', 'String'),
1854
        ('SecurityPolicyUri', 'String'),
1855
               ]
1856
1857
    def __init__(self):
1858
        self.PolicyId = None
1859
        self.TokenType = UserTokenType(0)
1860
        self.IssuedTokenType = None
1861
        self.IssuerEndpointUrl = None
1862
        self.SecurityPolicyUri = None
1863
        self._freeze = True
1864
1865
    def to_binary(self):
1866
        packet = []
1867
        packet.append(uabin.Primitives.String.pack(self.PolicyId))
1868
        packet.append(uabin.Primitives.UInt32.pack(self.TokenType.value))
1869
        packet.append(uabin.Primitives.String.pack(self.IssuedTokenType))
1870
        packet.append(uabin.Primitives.String.pack(self.IssuerEndpointUrl))
1871
        packet.append(uabin.Primitives.String.pack(self.SecurityPolicyUri))
1872
        return b''.join(packet)