Passed
Pull Request — master (#1267)
by
unknown
08:50
created

kytos.core.tag_ranges.get_tag_ranges()   D

Complexity

Conditions 12

Size

Total Lines 37
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 28
nop 1
dl 0
loc 37
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like kytos.core.tag_ranges.get_tag_ranges() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Methods for list of ranges [inclusive, inclusive]"""
2
# pylint: disable=too-many-branches
3
import bisect
4
from copy import deepcopy
5
from typing import Iterator, Optional, Union
6
7
from kytos.core.exceptions import KytosInvalidTagRanges
8
9
10
def get_special_tags(tag_range: list[str], default) -> list[str]:
11
    """Get special_tags and check values"""
12
    # Find duplicated
13
    if len(tag_range) != len(set(tag_range)):
14
        msg = "There are duplicated values in the range."
15
        raise KytosInvalidTagRanges(msg)
16
17
    # Find invalid tag
18
    default_set = set(default)
19
    for tag in tag_range:
20
        try:
21
            default_set.remove(tag)
22
        except KeyError:
23
            msg = f"The tag {tag} is not supported"
24
            raise KytosInvalidTagRanges(msg)
25
    return tag_range
26
27
28
def map_singular_values(tag_range: Union[int, list[int]]):
29
    """Change integer or singular interger list to
30
    list[int, int] when necessary"""
31
    if isinstance(tag_range, int):
32
        tag_range = [tag_range] * 2
33
    elif len(tag_range) == 1:
34
        tag_range = [tag_range[0]] * 2
35
    return tag_range
36
37
38
def get_tag_ranges(ranges: list[list[int]]):
39
    """Get tag_ranges and check validity:
40
    - It should be ordered
41
    - Not unnecessary partition (eg. [[10,20],[20,30]])
42
    - Singular intergers are changed to ranges (eg. [10] to [[10, 10]])
43
44
    The ranges are understood as [inclusive, inclusive]"""
45
    if len(ranges) < 1:
46
        msg = "Tag range is empty"
47
        raise KytosInvalidTagRanges(msg)
48
    last_tag = 0
49
    ranges_n = len(ranges)
50
    for i in range(0, ranges_n):
51
        ranges[i] = map_singular_values(ranges[i])
52
        if ranges[i][0] > ranges[i][1]:
53
            msg = f"The range {ranges[i]} is not ordered"
54
            raise KytosInvalidTagRanges(msg)
55
        if last_tag and last_tag > ranges[i][0]:
56
            msg = f"Tag ranges are not ordered. {last_tag}"\
57
                     f" is higher than {ranges[i][0]}"
58
            raise KytosInvalidTagRanges(msg)
59
        if last_tag and last_tag == ranges[i][0] - 1:
60
            msg = f"Tag ranges have an unnecessary partition. "\
61
                     f"{last_tag} is before to {ranges[i][0]}"
62
            raise KytosInvalidTagRanges(msg)
63
        if last_tag and last_tag == ranges[i][0]:
64
            msg = f"Tag ranges have repetition. {ranges[i-1]}"\
65
                     f" have same values as {ranges[i]}"
66
            raise KytosInvalidTagRanges(msg)
67
        last_tag = ranges[i][1]
68
    if ranges[-1][1] > 4095:
69
        msg = "Maximum value for a tag is 4095"
70
        raise KytosInvalidTagRanges(msg)
71
    if ranges[0][0] < 1:
72
        msg = "Minimum value for a tag is 1"
73
        raise KytosInvalidTagRanges(msg)
74
    return ranges
75
76
77
def get_validated_tags(
78
    tags: Union[list[int], list[list[int]]]
79
) -> Union[list[int], list[list[int]]]:
80
    """Return tags which values are validated to be correct."""
81
    if isinstance(tags, list) and isinstance(tags[0], int):
82
        if len(tags) == 1:
83
            return [tags[0], tags[0]]
84
        if len(tags) == 2 and tags[0] > tags[1]:
85
            raise KytosInvalidTagRanges(f"Range out of order {tags}")
86
        if len(tags) > 2:
87
            raise KytosInvalidTagRanges(f"Range must have 2 values {tags}")
88
        return tags
89
    if isinstance(tags, list) and isinstance(tags[0], list):
90
        return get_tag_ranges(tags)
91
    raise KytosInvalidTagRanges(f"Value type not recognized {tags}")
92
93
94
def range_intersection(
95
    ranges_a: list[list[int]],
96
    ranges_b: list[list[int]]
97
) -> Iterator[list[int]]:
98
    """Returns an iterator of an intersection between
99
    two validated list of ranges.
100
101
    Necessities:
102
        The lists from argument need to be ordered and validated.
103
        E.g. [[1, 2], [4, 60]]
104
        Use get_tag_ranges() for list[list[int]] or
105
            get_validated_tags() for also list[int]
106
    """
107
    a_i, b_i = 0, 0
108
    while a_i < len(ranges_a) and b_i < len(ranges_b):
109
        fst_a, snd_a = ranges_a[a_i]
110
        fst_b, snd_b = ranges_b[b_i]
111
        # Moving forward with non-intersection
112
        if snd_a < fst_b:
113
            a_i += 1
114
        elif snd_b < fst_a:
115
            b_i += 1
116
        else:
117
            # Intersection
118
            intersection_start = max(fst_a, fst_b)
119
            intersection_end = min(snd_a, snd_b)
120
            yield [intersection_start, intersection_end]
121
            if snd_a < snd_b:
122
                a_i += 1
123
            else:
124
                b_i += 1
125
126
127
def range_difference(
128
    ranges_a: list[Optional[list[int]]],
129
    ranges_b: list[Optional[list[int]]]
130
) -> list[list[int]]:
131
    """The operation is two validated list of ranges
132
     (ranges_a - ranges_b).
133
    This method simulates difference of sets.
134
135
    Necessities:
136
        The lists from argument need to be ordered and validated.
137
        E.g. [[1, 2], [4, 60]]
138
        Use get_tag_ranges() for list[list[int]] or
139
            get_validated_tags() for also list[int]
140
    """
141
    if not ranges_a:
142
        return []
143
    if not ranges_b:
144
        return deepcopy(ranges_a)
145
    result = []
146
    a_i, b_i = 0, 0
147
    update = True
148
    while a_i < len(ranges_a) and b_i < len(ranges_b):
149
        if update:
150
            start_a, end_a = ranges_a[a_i]
151
        else:
152
            update = True
153
        start_b, end_b = ranges_b[b_i]
154
        # Moving forward with non-intersection
155
        if end_a < start_b:
0 ignored issues
show
introduced by
The variable end_a does not seem to be defined for all execution paths.
Loading history...
156
            result.append([start_a, end_a])
0 ignored issues
show
introduced by
The variable start_a does not seem to be defined for all execution paths.
Loading history...
157
            a_i += 1
158
        elif end_b < start_a:
159
            b_i += 1
160
        else:
161
            # Intersection
162
            if start_a < start_b:
163
                result.append([start_a, start_b - 1])
164
            if end_a > end_b:
165
                start_a = end_b + 1
166
                update = False
167
                b_i += 1
168
            else:
169
                a_i += 1
170
    # Append last intersection and the rest of ranges_a
171
    while a_i < len(ranges_a):
172
        if update:
173
            start_a, end_a = ranges_a[a_i]
174
        else:
175
            update = True
176
        result.append([start_a, end_a])
177
        a_i += 1
178
    return result
179
180
181
def range_addition(
182
    ranges_a: list[Optional[list[int]]],
183
    ranges_b: list[Optional[list[int]]]
184
) -> tuple[list[list[int]], list[list[int]]]:
185
    """Addition between two validated list of ranges.
186
     Simulates the addition between two sets.
187
     Return[adittion product, intersection]
188
189
     Necessities:
190
        The lists from argument need to be ordered and validated.
191
        E.g. [[1, 2], [4, 60]]
192
        Use get_tag_ranges() for list[list[int]] or
193
            get_validated_tags() for also list[int]
194
     """
195
    if not ranges_b:
196
        return deepcopy(ranges_a), []
197
    if not ranges_a:
198
        return deepcopy(ranges_b), []
199
    result = []
200
    conflict = []
201
    a_i = b_i = 0
202
    len_a = len(ranges_a)
203
    len_b = len(ranges_b)
204
    while a_i < len_a or b_i < len_b:
205
        if (a_i < len_a and
206
                (b_i >= len_b or ranges_a[a_i][1] < ranges_b[b_i][0] - 1)):
207
            result.append(ranges_a[a_i])
208
            a_i += 1
209
        elif (b_i < len_b and
210
                (a_i >= len_a or ranges_b[b_i][1] < ranges_a[a_i][0] - 1)):
211
            result.append(ranges_b[b_i])
212
            b_i += 1
213
        # Intersection and continuos ranges
214
        else:
215
            fst = max(ranges_a[a_i][0], ranges_b[b_i][0])
216
            snd = min(ranges_a[a_i][1], ranges_b[b_i][1])
217
            if fst <= snd:
218
                conflict.append([fst, snd])
219
            new_range = [
220
                min(ranges_a[a_i][0], ranges_b[b_i][0]),
221
                max(ranges_a[a_i][1], ranges_b[b_i][1])
222
            ]
223
            a_i += 1
224
            b_i += 1
225
            while a_i < len_a or b_i < len_b:
226
                if a_i < len_a and (ranges_a[a_i][0] <= new_range[1] + 1):
227
                    if ranges_a[a_i][0] <= new_range[1]:
228
                        conflict.append([
229
                            max(ranges_a[a_i][0], new_range[0]),
230
                            min(ranges_a[a_i][1], new_range[1])
231
                        ])
232
                    new_range[1] = max(ranges_a[a_i][1], new_range[1])
233
                    a_i += 1
234
                elif b_i < len_b and (ranges_b[b_i][0] <= new_range[1] + 1):
235
                    if ranges_b[b_i][0] <= new_range[1]:
236
                        conflict.append([
237
                            max(ranges_b[b_i][0], new_range[0]),
238
                            min(ranges_b[b_i][1], new_range[1])
239
                        ])
240
                    new_range[1] = max(ranges_b[b_i][1], new_range[1])
241
                    b_i += 1
242
                else:
243
                    break
244
            result.append(new_range)
245
    return result, conflict
246
247
248
def find_index_remove(
249
    available_tags: list[list[int]],
250
    tag_range: list[int]
251
) -> Optional[int]:
252
    """Find the index of tags in available_tags to be removed"""
253
    index = bisect.bisect_left(available_tags, tag_range)
254
    if (index < len(available_tags) and
255
            tag_range[0] >= available_tags[index][0] and
256
            tag_range[1] <= available_tags[index][1]):
257
        return index
258
    if (index - 1 > -1 and
259
            tag_range[0] >= available_tags[index-1][0] and
260
            tag_range[1] <= available_tags[index-1][1]):
261
        return index - 1
262
    return None
263
264
265
def find_index_add(
266
    available_tags: list[list[int]],
267
    tags: list[int]
268
) -> Optional[int]:
269
    """Find the index of tags in available_tags to be added.
270
    This method assumes that tags is within self.tag_ranges"""
271
    index = bisect.bisect_left(available_tags, tags)
272
    if (index == 0 or tags[0] > available_tags[index-1][1]) and \
273
       (index == len(available_tags) or
274
            tags[1] < available_tags[index][0]):
275
        return index
276
    return None
277