1
|
|
|
"""Methods for list of ranges [inclusive, inclusive]""" |
2
|
|
|
# pylint: disable=too-many-branches |
3
|
|
|
import bisect |
4
|
|
|
from copy import deepcopy |
5
|
|
|
from typing import Iterator, Optional, Union |
6
|
|
|
|
7
|
|
|
from kytos.core.exceptions import KytosInvalidTagRanges |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
def get_special_tags(tag_range: list[str], default) -> list[str]: |
11
|
|
|
"""Get special_tags and check values""" |
12
|
|
|
# Find duplicated |
13
|
|
|
if len(tag_range) != len(set(tag_range)): |
14
|
|
|
msg = "There are duplicated values in the range." |
15
|
|
|
raise KytosInvalidTagRanges(msg) |
16
|
|
|
|
17
|
|
|
# Find invalid tag |
18
|
|
|
default_set = set(default) |
19
|
|
|
for tag in tag_range: |
20
|
|
|
try: |
21
|
|
|
default_set.remove(tag) |
22
|
|
|
except KeyError: |
23
|
|
|
msg = f"The tag {tag} is not supported" |
24
|
|
|
raise KytosInvalidTagRanges(msg) |
25
|
|
|
return tag_range |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
def map_singular_values(tag_range: Union[int, list[int]]): |
29
|
|
|
"""Change integer or singular interger list to |
30
|
|
|
list[int, int] when necessary""" |
31
|
|
|
if isinstance(tag_range, int): |
32
|
|
|
tag_range = [tag_range] * 2 |
33
|
|
|
elif len(tag_range) == 1: |
34
|
|
|
tag_range = [tag_range[0]] * 2 |
35
|
|
|
return tag_range |
36
|
|
|
|
37
|
|
|
|
38
|
|
|
def get_tag_ranges(ranges: list[list[int]]): |
39
|
|
|
"""Get tag_ranges and check validity: |
40
|
|
|
- It should be ordered |
41
|
|
|
- Not unnecessary partition (eg. [[10,20],[20,30]]) |
42
|
|
|
- Singular intergers are changed to ranges (eg. [10] to [[10, 10]]) |
43
|
|
|
|
44
|
|
|
The ranges are understood as [inclusive, inclusive]""" |
45
|
|
|
if len(ranges) < 1: |
46
|
|
|
msg = "Tag range is empty" |
47
|
|
|
raise KytosInvalidTagRanges(msg) |
48
|
|
|
last_tag = 0 |
49
|
|
|
ranges_n = len(ranges) |
50
|
|
|
for i in range(0, ranges_n): |
51
|
|
|
ranges[i] = map_singular_values(ranges[i]) |
52
|
|
|
if ranges[i][0] > ranges[i][1]: |
53
|
|
|
msg = f"The range {ranges[i]} is not ordered" |
54
|
|
|
raise KytosInvalidTagRanges(msg) |
55
|
|
|
if last_tag and last_tag > ranges[i][0]: |
56
|
|
|
msg = f"Tag ranges are not ordered. {last_tag}"\ |
57
|
|
|
f" is higher than {ranges[i][0]}" |
58
|
|
|
raise KytosInvalidTagRanges(msg) |
59
|
|
|
if last_tag and last_tag == ranges[i][0] - 1: |
60
|
|
|
msg = f"Tag ranges have an unnecessary partition. "\ |
61
|
|
|
f"{last_tag} is before to {ranges[i][0]}" |
62
|
|
|
raise KytosInvalidTagRanges(msg) |
63
|
|
|
if last_tag and last_tag == ranges[i][0]: |
64
|
|
|
msg = f"Tag ranges have repetition. {ranges[i-1]}"\ |
65
|
|
|
f" have same values as {ranges[i]}" |
66
|
|
|
raise KytosInvalidTagRanges(msg) |
67
|
|
|
last_tag = ranges[i][1] |
68
|
|
|
if ranges[-1][1] > 4095: |
69
|
|
|
msg = "Maximum value for a tag is 4095" |
70
|
|
|
raise KytosInvalidTagRanges(msg) |
71
|
|
|
if ranges[0][0] < 1: |
72
|
|
|
msg = "Minimum value for a tag is 1" |
73
|
|
|
raise KytosInvalidTagRanges(msg) |
74
|
|
|
return ranges |
75
|
|
|
|
76
|
|
|
|
77
|
|
|
def get_validated_tags( |
78
|
|
|
tags: Union[list[int], list[list[int]]] |
79
|
|
|
) -> Union[list[int], list[list[int]]]: |
80
|
|
|
"""Return tags which values are validated to be correct.""" |
81
|
|
|
if isinstance(tags, list) and isinstance(tags[0], int): |
82
|
|
|
if len(tags) == 1: |
83
|
|
|
return [tags[0], tags[0]] |
84
|
|
|
if len(tags) == 2 and tags[0] > tags[1]: |
85
|
|
|
raise KytosInvalidTagRanges(f"Range out of order {tags}") |
86
|
|
|
if len(tags) > 2: |
87
|
|
|
raise KytosInvalidTagRanges(f"Range must have 2 values {tags}") |
88
|
|
|
return tags |
89
|
|
|
if isinstance(tags, list) and isinstance(tags[0], list): |
90
|
|
|
return get_tag_ranges(tags) |
91
|
|
|
raise KytosInvalidTagRanges(f"Value type not recognized {tags}") |
92
|
|
|
|
93
|
|
|
|
94
|
|
|
def range_intersection( |
95
|
|
|
ranges_a: list[list[int]], |
96
|
|
|
ranges_b: list[list[int]] |
97
|
|
|
) -> Iterator[list[int]]: |
98
|
|
|
"""Returns an iterator of an intersection between |
99
|
|
|
two validated list of ranges. |
100
|
|
|
|
101
|
|
|
Necessities: |
102
|
|
|
The lists from argument need to be ordered and validated. |
103
|
|
|
E.g. [[1, 2], [4, 60]] |
104
|
|
|
Use get_tag_ranges() for list[list[int]] or |
105
|
|
|
get_validated_tags() for also list[int] |
106
|
|
|
""" |
107
|
|
|
a_i, b_i = 0, 0 |
108
|
|
|
while a_i < len(ranges_a) and b_i < len(ranges_b): |
109
|
|
|
fst_a, snd_a = ranges_a[a_i] |
110
|
|
|
fst_b, snd_b = ranges_b[b_i] |
111
|
|
|
# Moving forward with non-intersection |
112
|
|
|
if snd_a < fst_b: |
113
|
|
|
a_i += 1 |
114
|
|
|
elif snd_b < fst_a: |
115
|
|
|
b_i += 1 |
116
|
|
|
else: |
117
|
|
|
# Intersection |
118
|
|
|
intersection_start = max(fst_a, fst_b) |
119
|
|
|
intersection_end = min(snd_a, snd_b) |
120
|
|
|
yield [intersection_start, intersection_end] |
121
|
|
|
if snd_a < snd_b: |
122
|
|
|
a_i += 1 |
123
|
|
|
else: |
124
|
|
|
b_i += 1 |
125
|
|
|
|
126
|
|
|
|
127
|
|
|
def range_difference( |
128
|
|
|
ranges_a: list[Optional[list[int]]], |
129
|
|
|
ranges_b: list[Optional[list[int]]] |
130
|
|
|
) -> list[list[int]]: |
131
|
|
|
"""The operation is two validated list of ranges |
132
|
|
|
(ranges_a - ranges_b). |
133
|
|
|
This method simulates difference of sets. |
134
|
|
|
|
135
|
|
|
Necessities: |
136
|
|
|
The lists from argument need to be ordered and validated. |
137
|
|
|
E.g. [[1, 2], [4, 60]] |
138
|
|
|
Use get_tag_ranges() for list[list[int]] or |
139
|
|
|
get_validated_tags() for also list[int] |
140
|
|
|
""" |
141
|
|
|
if not ranges_a: |
142
|
|
|
return [] |
143
|
|
|
if not ranges_b: |
144
|
|
|
return deepcopy(ranges_a) |
145
|
|
|
result = [] |
146
|
|
|
a_i, b_i = 0, 0 |
147
|
|
|
update = True |
148
|
|
|
while a_i < len(ranges_a) and b_i < len(ranges_b): |
149
|
|
|
if update: |
150
|
|
|
start_a, end_a = ranges_a[a_i] |
151
|
|
|
else: |
152
|
|
|
update = True |
153
|
|
|
start_b, end_b = ranges_b[b_i] |
154
|
|
|
# Moving forward with non-intersection |
155
|
|
|
if end_a < start_b: |
|
|
|
|
156
|
|
|
result.append([start_a, end_a]) |
|
|
|
|
157
|
|
|
a_i += 1 |
158
|
|
|
elif end_b < start_a: |
159
|
|
|
b_i += 1 |
160
|
|
|
else: |
161
|
|
|
# Intersection |
162
|
|
|
if start_a < start_b: |
163
|
|
|
result.append([start_a, start_b - 1]) |
164
|
|
|
if end_a > end_b: |
165
|
|
|
start_a = end_b + 1 |
166
|
|
|
update = False |
167
|
|
|
b_i += 1 |
168
|
|
|
else: |
169
|
|
|
a_i += 1 |
170
|
|
|
# Append last intersection and the rest of ranges_a |
171
|
|
|
while a_i < len(ranges_a): |
172
|
|
|
if update: |
173
|
|
|
start_a, end_a = ranges_a[a_i] |
174
|
|
|
else: |
175
|
|
|
update = True |
176
|
|
|
result.append([start_a, end_a]) |
177
|
|
|
a_i += 1 |
178
|
|
|
return result |
179
|
|
|
|
180
|
|
|
|
181
|
|
|
def range_addition( |
182
|
|
|
ranges_a: list[Optional[list[int]]], |
183
|
|
|
ranges_b: list[Optional[list[int]]] |
184
|
|
|
) -> tuple[list[list[int]], list[list[int]]]: |
185
|
|
|
"""Addition between two validated list of ranges. |
186
|
|
|
Simulates the addition between two sets. |
187
|
|
|
Return[adittion product, intersection] |
188
|
|
|
|
189
|
|
|
Necessities: |
190
|
|
|
The lists from argument need to be ordered and validated. |
191
|
|
|
E.g. [[1, 2], [4, 60]] |
192
|
|
|
Use get_tag_ranges() for list[list[int]] or |
193
|
|
|
get_validated_tags() for also list[int] |
194
|
|
|
""" |
195
|
|
|
if not ranges_b: |
196
|
|
|
return deepcopy(ranges_a), [] |
197
|
|
|
if not ranges_a: |
198
|
|
|
return deepcopy(ranges_b), [] |
199
|
|
|
result = [] |
200
|
|
|
conflict = [] |
201
|
|
|
a_i = b_i = 0 |
202
|
|
|
len_a = len(ranges_a) |
203
|
|
|
len_b = len(ranges_b) |
204
|
|
|
while a_i < len_a or b_i < len_b: |
205
|
|
|
if (a_i < len_a and |
206
|
|
|
(b_i >= len_b or ranges_a[a_i][1] < ranges_b[b_i][0] - 1)): |
207
|
|
|
result.append(ranges_a[a_i]) |
208
|
|
|
a_i += 1 |
209
|
|
|
elif (b_i < len_b and |
210
|
|
|
(a_i >= len_a or ranges_b[b_i][1] < ranges_a[a_i][0] - 1)): |
211
|
|
|
result.append(ranges_b[b_i]) |
212
|
|
|
b_i += 1 |
213
|
|
|
# Intersection and continuos ranges |
214
|
|
|
else: |
215
|
|
|
fst = max(ranges_a[a_i][0], ranges_b[b_i][0]) |
216
|
|
|
snd = min(ranges_a[a_i][1], ranges_b[b_i][1]) |
217
|
|
|
if fst <= snd: |
218
|
|
|
conflict.append([fst, snd]) |
219
|
|
|
new_range = [ |
220
|
|
|
min(ranges_a[a_i][0], ranges_b[b_i][0]), |
221
|
|
|
max(ranges_a[a_i][1], ranges_b[b_i][1]) |
222
|
|
|
] |
223
|
|
|
a_i += 1 |
224
|
|
|
b_i += 1 |
225
|
|
|
while a_i < len_a or b_i < len_b: |
226
|
|
|
if a_i < len_a and (ranges_a[a_i][0] <= new_range[1] + 1): |
227
|
|
|
if ranges_a[a_i][0] <= new_range[1]: |
228
|
|
|
conflict.append([ |
229
|
|
|
max(ranges_a[a_i][0], new_range[0]), |
230
|
|
|
min(ranges_a[a_i][1], new_range[1]) |
231
|
|
|
]) |
232
|
|
|
new_range[1] = max(ranges_a[a_i][1], new_range[1]) |
233
|
|
|
a_i += 1 |
234
|
|
|
elif b_i < len_b and (ranges_b[b_i][0] <= new_range[1] + 1): |
235
|
|
|
if ranges_b[b_i][0] <= new_range[1]: |
236
|
|
|
conflict.append([ |
237
|
|
|
max(ranges_b[b_i][0], new_range[0]), |
238
|
|
|
min(ranges_b[b_i][1], new_range[1]) |
239
|
|
|
]) |
240
|
|
|
new_range[1] = max(ranges_b[b_i][1], new_range[1]) |
241
|
|
|
b_i += 1 |
242
|
|
|
else: |
243
|
|
|
break |
244
|
|
|
result.append(new_range) |
245
|
|
|
return result, conflict |
246
|
|
|
|
247
|
|
|
|
248
|
|
|
def find_index_remove( |
249
|
|
|
available_tags: list[list[int]], |
250
|
|
|
tag_range: list[int] |
251
|
|
|
) -> Optional[int]: |
252
|
|
|
"""Find the index of tags in available_tags to be removed""" |
253
|
|
|
index = bisect.bisect_left(available_tags, tag_range) |
254
|
|
|
if (index < len(available_tags) and |
255
|
|
|
tag_range[0] >= available_tags[index][0] and |
256
|
|
|
tag_range[1] <= available_tags[index][1]): |
257
|
|
|
return index |
258
|
|
|
if (index - 1 > -1 and |
259
|
|
|
tag_range[0] >= available_tags[index-1][0] and |
260
|
|
|
tag_range[1] <= available_tags[index-1][1]): |
261
|
|
|
return index - 1 |
262
|
|
|
return None |
263
|
|
|
|
264
|
|
|
|
265
|
|
|
def find_index_add( |
266
|
|
|
available_tags: list[list[int]], |
267
|
|
|
tags: list[int] |
268
|
|
|
) -> Optional[int]: |
269
|
|
|
"""Find the index of tags in available_tags to be added. |
270
|
|
|
This method assumes that tags is within self.tag_ranges""" |
271
|
|
|
index = bisect.bisect_left(available_tags, tags) |
272
|
|
|
if (index == 0 or tags[0] > available_tags[index-1][1]) and \ |
273
|
|
|
(index == len(available_tags) or |
274
|
|
|
tags[1] < available_tags[index][0]): |
275
|
|
|
return index |
276
|
|
|
return None |
277
|
|
|
|