Code Duplication    Length = 7-10 lines in 2 locations

opcua/common/subscription.py 1 location

@@ 315-324 (lines=10) @@
312
        # server_handle is left as None in purpose as we don't get it yet.
313
        with self._lock:
314
            for mi in monitored_items:
315
                data = SubscriptionItemData()
316
                data.client_handle = mi.RequestedParameters.ClientHandle
317
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
318
                data.attribute = mi.ItemToMonitor.AttributeId
319
                data.mfilter = mi.RequestedParameters.Filter
320
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
321
        results = self.server.create_monitored_items(params)
322
        mids = []
323
        # process result, add server_handle, or remove it if failed
324
        with self._lock:
325
            for idx, result in enumerate(results):
326
                mi = params.ItemsToCreate[idx]
327
                if not result.StatusCode.is_good():

opcua/server/event.py 1 location

@@ 111-117 (lines=7) @@
108
109
            def __init__(self):
110
                super(CustomEvent, self).__init__(extended=True)
111
                self.EventType = node.nodeid
112
                curr_node = node
113
114
                while curr_node.nodeid.Identifier != parent_identifier:
115
                    for prop in curr_node.get_properties():
116
                        setattr(self, prop.get_browse_name().Name, prop.get_value())
117
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
118
                    if len(parents) != 1: # Something went wrong
119
                        return None
120
                    curr_node = parents[0]