Code Duplication    Length = 23-23 lines in 2 locations

opcua/server/history.py 2 locations

@@ 147-169 (lines=23) @@
144
        if count and len(evts) > count:
145
            evts.pop(0)
146
147
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
148
        cont = None
149
        if source_id not in self._events:
150
            print("Error attempt to read event history for a node which does not historize events")
151
            return [], cont
152
        else:
153
            if start is None:
154
                start = ua.get_win_epoch()
155
            if end is None:
156
                end = ua.get_win_epoch()
157
            if start == ua.get_win_epoch():
158
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
159
            elif end == ua.get_win_epoch():
160
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
161
            elif start > end:
162
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
163
164
            else:
165
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
166
            if nb_values and len(results) > nb_values:
167
                cont = results[nb_values + 1].Time
168
                results = results[:nb_values]
169
            return results, cont
170
171
    def stop(self):
172
        pass
@@ 106-128 (lines=23) @@
103
        if count and len(data) > count:
104
            data.pop(0)
105
106
    def read_node_history(self, node_id, start, end, nb_values):
107
        cont = None
108
        if node_id not in self._datachanges:
109
            self.logger.warning("Error attempt to read history for a node which is not historized")
110
            return [], cont
111
        else:
112
            if start is None:
113
                start = ua.get_win_epoch()
114
            if end is None:
115
                end = ua.get_win_epoch()
116
            if start == ua.get_win_epoch():
117
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.SourceTimestamp]
118
            elif end == ua.get_win_epoch():
119
                results = [dv for dv in self._datachanges[node_id] if start <= dv.SourceTimestamp]
120
            elif start > end:
121
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.SourceTimestamp <= start]
122
123
            else:
124
                results = [dv for dv in self._datachanges[node_id] if start <= dv.SourceTimestamp <= end]
125
            if nb_values and len(results) > nb_values:
126
                cont = results[nb_values + 1].SourceTimestamp
127
                results = results[:nb_values]
128
            return results, cont
129
130
    def new_historized_event(self, source_id, evtypes, period, count=0):
131
        if source_id in self._events: