Code Duplication    Length = 55-59 lines in 6 locations

opcua/ua/uaprotocol_auto.py 6 locations

@@ 12070-12128 (lines=59) @@
12067
        'Parameters': 'SetMonitoringModeResult',
12068
               }
12069
12070
    def __init__(self, binary=None):
12071
        if binary is not None:
12072
            self._binary_init(binary)
12073
            self._freeze = True
12074
            return
12075
        self.TypeId = FourByteNodeId(ObjectIds.SetMonitoringModeResponse_Encoding_DefaultBinary)
12076
        self.ResponseHeader = ResponseHeader()
12077
        self.Parameters = SetMonitoringModeResult()
12078
        self._freeze = True
12079
12080
    def to_binary(self):
12081
        packet = []
12082
        packet.append(self.TypeId.to_binary())
12083
        packet.append(self.ResponseHeader.to_binary())
12084
        packet.append(self.Parameters.to_binary())
12085
        return b''.join(packet)
12086
12087
    @staticmethod
12088
    def from_binary(data):
12089
        return SetMonitoringModeResponse(data)
12090
12091
    def _binary_init(self, data):
12092
        self.TypeId = NodeId.from_binary(data)
12093
        self.ResponseHeader = ResponseHeader.from_binary(data)
12094
        self.Parameters = SetMonitoringModeResult.from_binary(data)
12095
12096
    def __str__(self):
12097
        return 'SetMonitoringModeResponse(' + 'TypeId:' + str(self.TypeId) + ', ' + \
12098
               'ResponseHeader:' + str(self.ResponseHeader) + ', ' + \
12099
               'Parameters:' + str(self.Parameters) + ')'
12100
12101
    __repr__ = __str__
12102
12103
12104
class SetTriggeringParameters(FrozenClass):
12105
    '''
12106
    :ivar SubscriptionId:
12107
    :vartype SubscriptionId: UInt32
12108
    :ivar TriggeringItemId:
12109
    :vartype TriggeringItemId: UInt32
12110
    :ivar LinksToAdd:
12111
    :vartype LinksToAdd: UInt32
12112
    :ivar LinksToRemove:
12113
    :vartype LinksToRemove: UInt32
12114
    '''
12115
12116
    ua_types = {
12117
        'SubscriptionId': 'UInt32',
12118
        'TriggeringItemId': 'UInt32',
12119
        'LinksToAdd': 'UInt32',
12120
        'LinksToRemove': 'UInt32',
12121
               }
12122
12123
    def __init__(self, binary=None):
12124
        if binary is not None:
12125
            self._binary_init(binary)
12126
            self._freeze = True
12127
            return
12128
        self.SubscriptionId = 0
12129
        self.TriggeringItemId = 0
12130
        self.LinksToAdd = []
12131
        self.LinksToRemove = []
@@ 1600-1656 (lines=57) @@
1597
1598
class ServerOnNetwork(FrozenClass):
1599
    '''
1600
    :ivar RecordId:
1601
    :vartype RecordId: UInt32
1602
    :ivar ServerName:
1603
    :vartype ServerName: String
1604
    :ivar DiscoveryUrl:
1605
    :vartype DiscoveryUrl: String
1606
    :ivar ServerCapabilities:
1607
    :vartype ServerCapabilities: String
1608
    '''
1609
1610
    ua_types = {
1611
        'RecordId': 'UInt32',
1612
        'ServerName': 'String',
1613
        'DiscoveryUrl': 'String',
1614
        'ServerCapabilities': 'String',
1615
               }
1616
1617
    def __init__(self, binary=None):
1618
        if binary is not None:
1619
            self._binary_init(binary)
1620
            self._freeze = True
1621
            return
1622
        self.RecordId = 0
1623
        self.ServerName = None
1624
        self.DiscoveryUrl = None
1625
        self.ServerCapabilities = []
1626
        self._freeze = True
1627
1628
    def to_binary(self):
1629
        packet = []
1630
        packet.append(uabin.Primitives.UInt32.pack(self.RecordId))
1631
        packet.append(uabin.Primitives.String.pack(self.ServerName))
1632
        packet.append(uabin.Primitives.String.pack(self.DiscoveryUrl))
1633
        packet.append(uabin.Primitives.Int32.pack(len(self.ServerCapabilities)))
1634
        for fieldname in self.ServerCapabilities:
1635
            packet.append(uabin.Primitives.String.pack(fieldname))
1636
        return b''.join(packet)
1637
1638
    @staticmethod
1639
    def from_binary(data):
1640
        return ServerOnNetwork(data)
1641
1642
    def _binary_init(self, data):
1643
        self.RecordId = uabin.Primitives.UInt32.unpack(data)
1644
        self.ServerName = uabin.Primitives.String.unpack(data)
1645
        self.DiscoveryUrl = uabin.Primitives.String.unpack(data)
1646
        self.ServerCapabilities = uabin.Primitives.String.unpack_array(data)
1647
1648
    def __str__(self):
1649
        return 'ServerOnNetwork(' + 'RecordId:' + str(self.RecordId) + ', ' + \
1650
               'ServerName:' + str(self.ServerName) + ', ' + \
1651
               'DiscoveryUrl:' + str(self.DiscoveryUrl) + ', ' + \
1652
               'ServerCapabilities:' + str(self.ServerCapabilities) + ')'
1653
1654
    __repr__ = __str__
1655
1656
1657
class FindServersOnNetworkParameters(FrozenClass):
1658
    '''
1659
    :ivar StartingRecordId:
@@ 3458-3514 (lines=57) @@
3455
3456
class UserNameIdentityToken(FrozenClass):
3457
    '''
3458
    A token representing a user identified by a user name and password.
3459
3460
    :ivar PolicyId:
3461
    :vartype PolicyId: String
3462
    :ivar UserName:
3463
    :vartype UserName: String
3464
    :ivar Password:
3465
    :vartype Password: ByteString
3466
    :ivar EncryptionAlgorithm:
3467
    :vartype EncryptionAlgorithm: String
3468
    '''
3469
3470
    ua_types = {
3471
        'PolicyId': 'String',
3472
        'UserName': 'String',
3473
        'Password': 'ByteString',
3474
        'EncryptionAlgorithm': 'String',
3475
               }
3476
3477
    def __init__(self, binary=None):
3478
        if binary is not None:
3479
            self._binary_init(binary)
3480
            self._freeze = True
3481
            return
3482
        self.PolicyId = None
3483
        self.UserName = None
3484
        self.Password = None
3485
        self.EncryptionAlgorithm = None
3486
        self._freeze = True
3487
3488
    def to_binary(self):
3489
        packet = []
3490
        packet.append(uabin.Primitives.String.pack(self.PolicyId))
3491
        packet.append(uabin.Primitives.String.pack(self.UserName))
3492
        packet.append(uabin.Primitives.ByteString.pack(self.Password))
3493
        packet.append(uabin.Primitives.String.pack(self.EncryptionAlgorithm))
3494
        return b''.join(packet)
3495
3496
    @staticmethod
3497
    def from_binary(data):
3498
        return UserNameIdentityToken(data)
3499
3500
    def _binary_init(self, data):
3501
        self.PolicyId = uabin.Primitives.String.unpack(data)
3502
        self.UserName = uabin.Primitives.String.unpack(data)
3503
        self.Password = uabin.Primitives.ByteString.unpack(data)
3504
        self.EncryptionAlgorithm = uabin.Primitives.String.unpack(data)
3505
3506
    def __str__(self):
3507
        return 'UserNameIdentityToken(' + 'PolicyId:' + str(self.PolicyId) + ', ' + \
3508
               'UserName:' + str(self.UserName) + ', ' + \
3509
               'Password:' + str(self.Password) + ', ' + \
3510
               'EncryptionAlgorithm:' + str(self.EncryptionAlgorithm) + ')'
3511
3512
    __repr__ = __str__
3513
3514
3515
class X509IdentityToken(FrozenClass):
3516
    '''
3517
    A token representing a user identified by an X509 certificate.
@@ 14531-14585 (lines=55) @@
14528
    def __init__(self, binary=None):
14529
        if binary is not None:
14530
            self._binary_init(binary)
14531
            self._freeze = True
14532
            return
14533
        self.ServerUri = None
14534
        self.NetworkPaths = []
14535
        self._freeze = True
14536
14537
    def to_binary(self):
14538
        packet = []
14539
        packet.append(uabin.Primitives.String.pack(self.ServerUri))
14540
        packet.append(uabin.Primitives.Int32.pack(len(self.NetworkPaths)))
14541
        for fieldname in self.NetworkPaths:
14542
            packet.append(fieldname.to_binary())
14543
        return b''.join(packet)
14544
14545
    @staticmethod
14546
    def from_binary(data):
14547
        return NetworkGroupDataType(data)
14548
14549
    def _binary_init(self, data):
14550
        self.ServerUri = uabin.Primitives.String.unpack(data)
14551
        length = uabin.Primitives.Int32.unpack(data)
14552
        array = []
14553
        if length != -1:
14554
            for _ in range(0, length):
14555
                array.append(EndpointUrlListDataType.from_binary(data))
14556
        self.NetworkPaths = array
14557
14558
    def __str__(self):
14559
        return 'NetworkGroupDataType(' + 'ServerUri:' + str(self.ServerUri) + ', ' + \
14560
               'NetworkPaths:' + str(self.NetworkPaths) + ')'
14561
14562
    __repr__ = __str__
14563
14564
14565
class SamplingIntervalDiagnosticsDataType(FrozenClass):
14566
    '''
14567
    :ivar SamplingInterval:
14568
    :vartype SamplingInterval: Double
14569
    :ivar MonitoredItemCount:
14570
    :vartype MonitoredItemCount: UInt32
14571
    :ivar MaxMonitoredItemCount:
14572
    :vartype MaxMonitoredItemCount: UInt32
14573
    :ivar DisabledMonitoredItemCount:
14574
    :vartype DisabledMonitoredItemCount: UInt32
14575
    '''
14576
14577
    ua_types = {
14578
        'SamplingInterval': 'Double',
14579
        'MonitoredItemCount': 'UInt32',
14580
        'MaxMonitoredItemCount': 'UInt32',
14581
        'DisabledMonitoredItemCount': 'UInt32',
14582
               }
14583
14584
    def __init__(self, binary=None):
14585
        if binary is not None:
14586
            self._binary_init(binary)
14587
            self._freeze = True
14588
            return
@@ 12603-12657 (lines=55) @@
12600
        'Parameters': 'CreateSubscriptionParameters',
12601
               }
12602
12603
    def __init__(self, binary=None):
12604
        if binary is not None:
12605
            self._binary_init(binary)
12606
            self._freeze = True
12607
            return
12608
        self.TypeId = FourByteNodeId(ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary)
12609
        self.RequestHeader = RequestHeader()
12610
        self.Parameters = CreateSubscriptionParameters()
12611
        self._freeze = True
12612
12613
    def to_binary(self):
12614
        packet = []
12615
        packet.append(self.TypeId.to_binary())
12616
        packet.append(self.RequestHeader.to_binary())
12617
        packet.append(self.Parameters.to_binary())
12618
        return b''.join(packet)
12619
12620
    @staticmethod
12621
    def from_binary(data):
12622
        return CreateSubscriptionRequest(data)
12623
12624
    def _binary_init(self, data):
12625
        self.TypeId = NodeId.from_binary(data)
12626
        self.RequestHeader = RequestHeader.from_binary(data)
12627
        self.Parameters = CreateSubscriptionParameters.from_binary(data)
12628
12629
    def __str__(self):
12630
        return 'CreateSubscriptionRequest(' + 'TypeId:' + str(self.TypeId) + ', ' + \
12631
               'RequestHeader:' + str(self.RequestHeader) + ', ' + \
12632
               'Parameters:' + str(self.Parameters) + ')'
12633
12634
    __repr__ = __str__
12635
12636
12637
class CreateSubscriptionResult(FrozenClass):
12638
    '''
12639
    :ivar SubscriptionId:
12640
    :vartype SubscriptionId: UInt32
12641
    :ivar RevisedPublishingInterval:
12642
    :vartype RevisedPublishingInterval: Double
12643
    :ivar RevisedLifetimeCount:
12644
    :vartype RevisedLifetimeCount: UInt32
12645
    :ivar RevisedMaxKeepAliveCount:
12646
    :vartype RevisedMaxKeepAliveCount: UInt32
12647
    '''
12648
12649
    ua_types = {
12650
        'SubscriptionId': 'UInt32',
12651
        'RevisedPublishingInterval': 'Double',
12652
        'RevisedLifetimeCount': 'UInt32',
12653
        'RevisedMaxKeepAliveCount': 'UInt32',
12654
               }
12655
12656
    def __init__(self, binary=None):
12657
        if binary is not None:
12658
            self._binary_init(binary)
12659
            self._freeze = True
12660
            return
@@ 2630-2686 (lines=57) @@
2627
2628
class ChannelSecurityToken(FrozenClass):
2629
    '''
2630
    The token that identifies a set of keys for an active secure channel.
2631
2632
    :ivar ChannelId:
2633
    :vartype ChannelId: UInt32
2634
    :ivar TokenId:
2635
    :vartype TokenId: UInt32
2636
    :ivar CreatedAt:
2637
    :vartype CreatedAt: DateTime
2638
    :ivar RevisedLifetime:
2639
    :vartype RevisedLifetime: UInt32
2640
    '''
2641
2642
    ua_types = {
2643
        'ChannelId': 'UInt32',
2644
        'TokenId': 'UInt32',
2645
        'CreatedAt': 'DateTime',
2646
        'RevisedLifetime': 'UInt32',
2647
               }
2648
2649
    def __init__(self, binary=None):
2650
        if binary is not None:
2651
            self._binary_init(binary)
2652
            self._freeze = True
2653
            return
2654
        self.ChannelId = 0
2655
        self.TokenId = 0
2656
        self.CreatedAt = datetime.utcnow()
2657
        self.RevisedLifetime = 0
2658
        self._freeze = True
2659
2660
    def to_binary(self):
2661
        packet = []
2662
        packet.append(uabin.Primitives.UInt32.pack(self.ChannelId))
2663
        packet.append(uabin.Primitives.UInt32.pack(self.TokenId))
2664
        packet.append(uabin.Primitives.DateTime.pack(self.CreatedAt))
2665
        packet.append(uabin.Primitives.UInt32.pack(self.RevisedLifetime))
2666
        return b''.join(packet)
2667
2668
    @staticmethod
2669
    def from_binary(data):
2670
        return ChannelSecurityToken(data)
2671
2672
    def _binary_init(self, data):
2673
        self.ChannelId = uabin.Primitives.UInt32.unpack(data)
2674
        self.TokenId = uabin.Primitives.UInt32.unpack(data)
2675
        self.CreatedAt = uabin.Primitives.DateTime.unpack(data)
2676
        self.RevisedLifetime = uabin.Primitives.UInt32.unpack(data)
2677
2678
    def __str__(self):
2679
        return 'ChannelSecurityToken(' + 'ChannelId:' + str(self.ChannelId) + ', ' + \
2680
               'TokenId:' + str(self.TokenId) + ', ' + \
2681
               'CreatedAt:' + str(self.CreatedAt) + ', ' + \
2682
               'RevisedLifetime:' + str(self.RevisedLifetime) + ')'
2683
2684
    __repr__ = __str__
2685
2686
2687
class OpenSecureChannelParameters(FrozenClass):
2688
    '''
2689
    :ivar ClientProtocolVersion: