Code Duplication    Length = 41-48 lines in 12 locations

opcua/ua/uaprotocol_auto.py 12 locations

@@ 5972-6015 (lines=44) @@
5969
    __repr__ = __str__
5970
5971
5972
class BrowsePathResult(FrozenClass):
5973
    '''
5974
    The result of a translate opearation.
5975
5976
    :ivar StatusCode:
5977
    :vartype StatusCode: StatusCode
5978
    :ivar Targets:
5979
    :vartype Targets: BrowsePathTarget
5980
    '''
5981
    def __init__(self, binary=None):
5982
        if binary is not None:
5983
            self._binary_init(binary)
5984
            self._freeze = True
5985
            return
5986
        self.StatusCode = StatusCode()
5987
        self.Targets = []
5988
        self._freeze = True
5989
5990
    def to_binary(self):
5991
        packet = []
5992
        packet.append(self.StatusCode.to_binary())
5993
        packet.append(uatype_Int32.pack(len(self.Targets)))
5994
        for fieldname in self.Targets:
5995
            packet.append(fieldname.to_binary())
5996
        return b''.join(packet)
5997
5998
    @staticmethod
5999
    def from_binary(data):
6000
        return BrowsePathResult(data)
6001
6002
    def _binary_init(self, data):
6003
        self.StatusCode = StatusCode.from_binary(data)
6004
        length = uatype_Int32.unpack(data.read(4))[0]
6005
        array = []
6006
        if length != -1:
6007
            for _ in range(0, length):
6008
                array.append(BrowsePathTarget.from_binary(data))
6009
        self.Targets = array
6010
6011
    def __str__(self):
6012
        return 'BrowsePathResult(' + 'StatusCode:' + str(self.StatusCode) + ', ' + \
6013
               'Targets:' + str(self.Targets) + ')'
6014
6015
    __repr__ = __str__
6016
6017
6018
class TranslateBrowsePathsToNodeIdsParameters(FrozenClass):
@@ 10427-10469 (lines=43) @@
10424
    __repr__ = __str__
10425
10426
10427
class SetMonitoringModeParameters(FrozenClass):
10428
    '''
10429
    :ivar SubscriptionId:
10430
    :vartype SubscriptionId: UInt32
10431
    :ivar MonitoringMode:
10432
    :vartype MonitoringMode: MonitoringMode
10433
    :ivar MonitoredItemIds:
10434
    :vartype MonitoredItemIds: UInt32
10435
    '''
10436
    def __init__(self, binary=None):
10437
        if binary is not None:
10438
            self._binary_init(binary)
10439
            self._freeze = True
10440
            return
10441
        self.SubscriptionId = 0
10442
        self.MonitoringMode = MonitoringMode(0)
10443
        self.MonitoredItemIds = []
10444
        self._freeze = True
10445
10446
    def to_binary(self):
10447
        packet = []
10448
        packet.append(uatype_UInt32.pack(self.SubscriptionId))
10449
        packet.append(uatype_UInt32.pack(self.MonitoringMode.value))
10450
        packet.append(uatype_Int32.pack(len(self.MonitoredItemIds)))
10451
        for fieldname in self.MonitoredItemIds:
10452
            packet.append(uatype_UInt32.pack(fieldname))
10453
        return b''.join(packet)
10454
10455
    @staticmethod
10456
    def from_binary(data):
10457
        return SetMonitoringModeParameters(data)
10458
10459
    def _binary_init(self, data):
10460
        self.SubscriptionId = uatype_UInt32.unpack(data.read(4))[0]
10461
        self.MonitoringMode = MonitoringMode(uatype_UInt32.unpack(data.read(4))[0])
10462
        self.MonitoredItemIds = unpack_uatype_array('UInt32', data)
10463
10464
    def __str__(self):
10465
        return 'SetMonitoringModeParameters(' + 'SubscriptionId:' + str(self.SubscriptionId) + ', ' + \
10466
               'MonitoringMode:' + str(self.MonitoringMode) + ', ' + \
10467
               'MonitoredItemIds:' + str(self.MonitoredItemIds) + ')'
10468
10469
    __repr__ = __str__
10470
10471
10472
class SetMonitoringModeRequest(FrozenClass):
@@ 11735-11776 (lines=42) @@
11732
    __repr__ = __str__
11733
11734
11735
class EventFieldList(FrozenClass):
11736
    '''
11737
    :ivar ClientHandle:
11738
    :vartype ClientHandle: UInt32
11739
    :ivar EventFields:
11740
    :vartype EventFields: Variant
11741
    '''
11742
    def __init__(self, binary=None):
11743
        if binary is not None:
11744
            self._binary_init(binary)
11745
            self._freeze = True
11746
            return
11747
        self.ClientHandle = 0
11748
        self.EventFields = []
11749
        self._freeze = True
11750
11751
    def to_binary(self):
11752
        packet = []
11753
        packet.append(uatype_UInt32.pack(self.ClientHandle))
11754
        packet.append(uatype_Int32.pack(len(self.EventFields)))
11755
        for fieldname in self.EventFields:
11756
            packet.append(fieldname.to_binary())
11757
        return b''.join(packet)
11758
11759
    @staticmethod
11760
    def from_binary(data):
11761
        return EventFieldList(data)
11762
11763
    def _binary_init(self, data):
11764
        self.ClientHandle = uatype_UInt32.unpack(data.read(4))[0]
11765
        length = uatype_Int32.unpack(data.read(4))[0]
11766
        array = []
11767
        if length != -1:
11768
            for _ in range(0, length):
11769
                array.append(Variant.from_binary(data))
11770
        self.EventFields = array
11771
11772
    def __str__(self):
11773
        return 'EventFieldList(' + 'ClientHandle:' + str(self.ClientHandle) + ', ' + \
11774
               'EventFields:' + str(self.EventFields) + ')'
11775
11776
    __repr__ = __str__
11777
11778
11779
class HistoryEventFieldList(FrozenClass):
@@ 9601-9642 (lines=42) @@
9598
    __repr__ = __str__
9599
9600
9601
class EventFilter(FrozenClass):
9602
    '''
9603
    :ivar SelectClauses:
9604
    :vartype SelectClauses: SimpleAttributeOperand
9605
    :ivar WhereClause:
9606
    :vartype WhereClause: ContentFilter
9607
    '''
9608
    def __init__(self, binary=None):
9609
        if binary is not None:
9610
            self._binary_init(binary)
9611
            self._freeze = True
9612
            return
9613
        self.SelectClauses = []
9614
        self.WhereClause = ContentFilter()
9615
        self._freeze = True
9616
9617
    def to_binary(self):
9618
        packet = []
9619
        packet.append(uatype_Int32.pack(len(self.SelectClauses)))
9620
        for fieldname in self.SelectClauses:
9621
            packet.append(fieldname.to_binary())
9622
        packet.append(self.WhereClause.to_binary())
9623
        return b''.join(packet)
9624
9625
    @staticmethod
9626
    def from_binary(data):
9627
        return EventFilter(data)
9628
9629
    def _binary_init(self, data):
9630
        length = uatype_Int32.unpack(data.read(4))[0]
9631
        array = []
9632
        if length != -1:
9633
            for _ in range(0, length):
9634
                array.append(SimpleAttributeOperand.from_binary(data))
9635
        self.SelectClauses = array
9636
        self.WhereClause = ContentFilter.from_binary(data)
9637
9638
    def __str__(self):
9639
        return 'EventFilter(' + 'SelectClauses:' + str(self.SelectClauses) + ', ' + \
9640
               'WhereClause:' + str(self.WhereClause) + ')'
9641
9642
    __repr__ = __str__
9643
9644
9645
class AggregateConfiguration(FrozenClass):
@@ 6887-6928 (lines=42) @@
6884
    __repr__ = __str__
6885
6886
6887
class ContentFilterElement(FrozenClass):
6888
    '''
6889
    :ivar FilterOperator:
6890
    :vartype FilterOperator: FilterOperator
6891
    :ivar FilterOperands:
6892
    :vartype FilterOperands: ExtensionObject
6893
    '''
6894
    def __init__(self, binary=None):
6895
        if binary is not None:
6896
            self._binary_init(binary)
6897
            self._freeze = True
6898
            return
6899
        self.FilterOperator = FilterOperator(0)
6900
        self.FilterOperands = []
6901
        self._freeze = True
6902
6903
    def to_binary(self):
6904
        packet = []
6905
        packet.append(uatype_UInt32.pack(self.FilterOperator.value))
6906
        packet.append(uatype_Int32.pack(len(self.FilterOperands)))
6907
        for fieldname in self.FilterOperands:
6908
            packet.append(extensionobject_to_binary(fieldname))
6909
        return b''.join(packet)
6910
6911
    @staticmethod
6912
    def from_binary(data):
6913
        return ContentFilterElement(data)
6914
6915
    def _binary_init(self, data):
6916
        self.FilterOperator = FilterOperator(uatype_UInt32.unpack(data.read(4))[0])
6917
        length = uatype_Int32.unpack(data.read(4))[0]
6918
        array = []
6919
        if length != -1:
6920
            for _ in range(0, length):
6921
                array.append(extensionobject_from_binary(data))
6922
        self.FilterOperands = array
6923
6924
    def __str__(self):
6925
        return 'ContentFilterElement(' + 'FilterOperator:' + str(self.FilterOperator) + ', ' + \
6926
               'FilterOperands:' + str(self.FilterOperands) + ')'
6927
6928
    __repr__ = __str__
6929
6930
6931
class ContentFilter(FrozenClass):
@@ 2234-2275 (lines=42) @@
2231
    __repr__ = __str__
2232
2233
2234
class RegisterServer2Parameters(FrozenClass):
2235
    '''
2236
    :ivar Server:
2237
    :vartype Server: RegisteredServer
2238
    :ivar DiscoveryConfiguration:
2239
    :vartype DiscoveryConfiguration: ExtensionObject
2240
    '''
2241
    def __init__(self, binary=None):
2242
        if binary is not None:
2243
            self._binary_init(binary)
2244
            self._freeze = True
2245
            return
2246
        self.Server = RegisteredServer()
2247
        self.DiscoveryConfiguration = []
2248
        self._freeze = True
2249
2250
    def to_binary(self):
2251
        packet = []
2252
        packet.append(self.Server.to_binary())
2253
        packet.append(uatype_Int32.pack(len(self.DiscoveryConfiguration)))
2254
        for fieldname in self.DiscoveryConfiguration:
2255
            packet.append(extensionobject_to_binary(fieldname))
2256
        return b''.join(packet)
2257
2258
    @staticmethod
2259
    def from_binary(data):
2260
        return RegisterServer2Parameters(data)
2261
2262
    def _binary_init(self, data):
2263
        self.Server = RegisteredServer.from_binary(data)
2264
        length = uatype_Int32.unpack(data.read(4))[0]
2265
        array = []
2266
        if length != -1:
2267
            for _ in range(0, length):
2268
                array.append(extensionobject_from_binary(data))
2269
        self.DiscoveryConfiguration = array
2270
2271
    def __str__(self):
2272
        return 'RegisterServer2Parameters(' + 'Server:' + str(self.Server) + ', ' + \
2273
               'DiscoveryConfiguration:' + str(self.DiscoveryConfiguration) + ')'
2274
2275
    __repr__ = __str__
2276
2277
2278
class RegisterServer2Request(FrozenClass):
@@ 11532-11579 (lines=48) @@
11529
    __repr__ = __str__
11530
11531
11532
class NotificationMessage(FrozenClass):
11533
    '''
11534
    :ivar SequenceNumber:
11535
    :vartype SequenceNumber: UInt32
11536
    :ivar PublishTime:
11537
    :vartype PublishTime: DateTime
11538
    :ivar NotificationData:
11539
    :vartype NotificationData: ExtensionObject
11540
    '''
11541
    def __init__(self, binary=None):
11542
        if binary is not None:
11543
            self._binary_init(binary)
11544
            self._freeze = True
11545
            return
11546
        self.SequenceNumber = 0
11547
        self.PublishTime = datetime.now()
11548
        self.NotificationData = []
11549
        self._freeze = True
11550
11551
    def to_binary(self):
11552
        packet = []
11553
        packet.append(uatype_UInt32.pack(self.SequenceNumber))
11554
        packet.append(pack_datetime(self.PublishTime))
11555
        packet.append(uatype_Int32.pack(len(self.NotificationData)))
11556
        for fieldname in self.NotificationData:
11557
            packet.append(extensionobject_to_binary(fieldname))
11558
        return b''.join(packet)
11559
11560
    @staticmethod
11561
    def from_binary(data):
11562
        return NotificationMessage(data)
11563
11564
    def _binary_init(self, data):
11565
        self.SequenceNumber = uatype_UInt32.unpack(data.read(4))[0]
11566
        self.PublishTime = unpack_datetime(data)
11567
        length = uatype_Int32.unpack(data.read(4))[0]
11568
        array = []
11569
        if length != -1:
11570
            for _ in range(0, length):
11571
                array.append(extensionobject_from_binary(data))
11572
        self.NotificationData = array
11573
11574
    def __str__(self):
11575
        return 'NotificationMessage(' + 'SequenceNumber:' + str(self.SequenceNumber) + ', ' + \
11576
               'PublishTime:' + str(self.PublishTime) + ', ' + \
11577
               'NotificationData:' + str(self.NotificationData) + ')'
11578
11579
    __repr__ = __str__
11580
11581
11582
class NotificationData(FrozenClass):
@@ 1538-1580 (lines=43) @@
1535
    __repr__ = __str__
1536
1537
1538
class FindServersOnNetworkParameters(FrozenClass):
1539
    '''
1540
    :ivar StartingRecordId:
1541
    :vartype StartingRecordId: UInt32
1542
    :ivar MaxRecordsToReturn:
1543
    :vartype MaxRecordsToReturn: UInt32
1544
    :ivar ServerCapabilityFilter:
1545
    :vartype ServerCapabilityFilter: String
1546
    '''
1547
    def __init__(self, binary=None):
1548
        if binary is not None:
1549
            self._binary_init(binary)
1550
            self._freeze = True
1551
            return
1552
        self.StartingRecordId = 0
1553
        self.MaxRecordsToReturn = 0
1554
        self.ServerCapabilityFilter = []
1555
        self._freeze = True
1556
1557
    def to_binary(self):
1558
        packet = []
1559
        packet.append(uatype_UInt32.pack(self.StartingRecordId))
1560
        packet.append(uatype_UInt32.pack(self.MaxRecordsToReturn))
1561
        packet.append(uatype_Int32.pack(len(self.ServerCapabilityFilter)))
1562
        for fieldname in self.ServerCapabilityFilter:
1563
            packet.append(pack_string(fieldname))
1564
        return b''.join(packet)
1565
1566
    @staticmethod
1567
    def from_binary(data):
1568
        return FindServersOnNetworkParameters(data)
1569
1570
    def _binary_init(self, data):
1571
        self.StartingRecordId = uatype_UInt32.unpack(data.read(4))[0]
1572
        self.MaxRecordsToReturn = uatype_UInt32.unpack(data.read(4))[0]
1573
        self.ServerCapabilityFilter = unpack_uatype_array('String', data)
1574
1575
    def __str__(self):
1576
        return 'FindServersOnNetworkParameters(' + 'StartingRecordId:' + str(self.StartingRecordId) + ', ' + \
1577
               'MaxRecordsToReturn:' + str(self.MaxRecordsToReturn) + ', ' + \
1578
               'ServerCapabilityFilter:' + str(self.ServerCapabilityFilter) + ')'
1579
1580
    __repr__ = __str__
1581
1582
1583
class FindServersOnNetworkRequest(FrozenClass):
@@ 12706-12747 (lines=42) @@
12703
    __repr__ = __str__
12704
12705
12706
class NetworkGroupDataType(FrozenClass):
12707
    '''
12708
    :ivar ServerUri:
12709
    :vartype ServerUri: String
12710
    :ivar NetworkPaths:
12711
    :vartype NetworkPaths: EndpointUrlListDataType
12712
    '''
12713
    def __init__(self, binary=None):
12714
        if binary is not None:
12715
            self._binary_init(binary)
12716
            self._freeze = True
12717
            return
12718
        self.ServerUri = None
12719
        self.NetworkPaths = []
12720
        self._freeze = True
12721
12722
    def to_binary(self):
12723
        packet = []
12724
        packet.append(pack_string(self.ServerUri))
12725
        packet.append(uatype_Int32.pack(len(self.NetworkPaths)))
12726
        for fieldname in self.NetworkPaths:
12727
            packet.append(fieldname.to_binary())
12728
        return b''.join(packet)
12729
12730
    @staticmethod
12731
    def from_binary(data):
12732
        return NetworkGroupDataType(data)
12733
12734
    def _binary_init(self, data):
12735
        self.ServerUri = unpack_string(data)
12736
        length = uatype_Int32.unpack(data.read(4))[0]
12737
        array = []
12738
        if length != -1:
12739
            for _ in range(0, length):
12740
                array.append(EndpointUrlListDataType.from_binary(data))
12741
        self.NetworkPaths = array
12742
12743
    def __str__(self):
12744
        return 'NetworkGroupDataType(' + 'ServerUri:' + str(self.ServerUri) + ', ' + \
12745
               'NetworkPaths:' + str(self.NetworkPaths) + ')'
12746
12747
    __repr__ = __str__
12748
12749
12750
class SamplingIntervalDiagnosticsDataType(FrozenClass):
@@ 7611-7652 (lines=42) @@
7608
    __repr__ = __str__
7609
7610
7611
class QueryNextResult(FrozenClass):
7612
    '''
7613
    :ivar QueryDataSets:
7614
    :vartype QueryDataSets: QueryDataSet
7615
    :ivar RevisedContinuationPoint:
7616
    :vartype RevisedContinuationPoint: ByteString
7617
    '''
7618
    def __init__(self, binary=None):
7619
        if binary is not None:
7620
            self._binary_init(binary)
7621
            self._freeze = True
7622
            return
7623
        self.QueryDataSets = []
7624
        self.RevisedContinuationPoint = None
7625
        self._freeze = True
7626
7627
    def to_binary(self):
7628
        packet = []
7629
        packet.append(uatype_Int32.pack(len(self.QueryDataSets)))
7630
        for fieldname in self.QueryDataSets:
7631
            packet.append(fieldname.to_binary())
7632
        packet.append(pack_bytes(self.RevisedContinuationPoint))
7633
        return b''.join(packet)
7634
7635
    @staticmethod
7636
    def from_binary(data):
7637
        return QueryNextResult(data)
7638
7639
    def _binary_init(self, data):
7640
        length = uatype_Int32.unpack(data.read(4))[0]
7641
        array = []
7642
        if length != -1:
7643
            for _ in range(0, length):
7644
                array.append(QueryDataSet.from_binary(data))
7645
        self.QueryDataSets = array
7646
        self.RevisedContinuationPoint = unpack_bytes(data)
7647
7648
    def __str__(self):
7649
        return 'QueryNextResult(' + 'QueryDataSets:' + str(self.QueryDataSets) + ', ' + \
7650
               'RevisedContinuationPoint:' + str(self.RevisedContinuationPoint) + ')'
7651
7652
    __repr__ = __str__
7653
7654
7655
class QueryNextResponse(FrozenClass):
@@ 1626-1667 (lines=42) @@
1623
    __repr__ = __str__
1624
1625
1626
class FindServersOnNetworkResult(FrozenClass):
1627
    '''
1628
    :ivar LastCounterResetTime:
1629
    :vartype LastCounterResetTime: DateTime
1630
    :ivar Servers:
1631
    :vartype Servers: ServerOnNetwork
1632
    '''
1633
    def __init__(self, binary=None):
1634
        if binary is not None:
1635
            self._binary_init(binary)
1636
            self._freeze = True
1637
            return
1638
        self.LastCounterResetTime = datetime.now()
1639
        self.Servers = []
1640
        self._freeze = True
1641
1642
    def to_binary(self):
1643
        packet = []
1644
        packet.append(pack_datetime(self.LastCounterResetTime))
1645
        packet.append(uatype_Int32.pack(len(self.Servers)))
1646
        for fieldname in self.Servers:
1647
            packet.append(fieldname.to_binary())
1648
        return b''.join(packet)
1649
1650
    @staticmethod
1651
    def from_binary(data):
1652
        return FindServersOnNetworkResult(data)
1653
1654
    def _binary_init(self, data):
1655
        self.LastCounterResetTime = unpack_datetime(data)
1656
        length = uatype_Int32.unpack(data.read(4))[0]
1657
        array = []
1658
        if length != -1:
1659
            for _ in range(0, length):
1660
                array.append(ServerOnNetwork.from_binary(data))
1661
        self.Servers = array
1662
1663
    def __str__(self):
1664
        return 'FindServersOnNetworkResult(' + 'LastCounterResetTime:' + str(self.LastCounterResetTime) + ', ' + \
1665
               'Servers:' + str(self.Servers) + ')'
1666
1667
    __repr__ = __str__
1668
1669
1670
class FindServersOnNetworkResponse(FrozenClass):
@@ 9558-9598 (lines=41) @@
9555
    __repr__ = __str__
9556
9557
9558
class DataChangeFilter(FrozenClass):
9559
    '''
9560
    :ivar Trigger:
9561
    :vartype Trigger: DataChangeTrigger
9562
    :ivar DeadbandType:
9563
    :vartype DeadbandType: UInt32
9564
    :ivar DeadbandValue:
9565
    :vartype DeadbandValue: Double
9566
    '''
9567
    def __init__(self, binary=None):
9568
        if binary is not None:
9569
            self._binary_init(binary)
9570
            self._freeze = True
9571
            return
9572
        self.Trigger = DataChangeTrigger(0)
9573
        self.DeadbandType = 0
9574
        self.DeadbandValue = 0
9575
        self._freeze = True
9576
9577
    def to_binary(self):
9578
        packet = []
9579
        packet.append(uatype_UInt32.pack(self.Trigger.value))
9580
        packet.append(uatype_UInt32.pack(self.DeadbandType))
9581
        packet.append(uatype_Double.pack(self.DeadbandValue))
9582
        return b''.join(packet)
9583
9584
    @staticmethod
9585
    def from_binary(data):
9586
        return DataChangeFilter(data)
9587
9588
    def _binary_init(self, data):
9589
        self.Trigger = DataChangeTrigger(uatype_UInt32.unpack(data.read(4))[0])
9590
        self.DeadbandType = uatype_UInt32.unpack(data.read(4))[0]
9591
        self.DeadbandValue = uatype_Double.unpack(data.read(8))[0]
9592
9593
    def __str__(self):
9594
        return 'DataChangeFilter(' + 'Trigger:' + str(self.Trigger) + ', ' + \
9595
               'DeadbandType:' + str(self.DeadbandType) + ', ' + \
9596
               'DeadbandValue:' + str(self.DeadbandValue) + ')'
9597
9598
    __repr__ = __str__
9599
9600
9601
class EventFilter(FrozenClass):