visibility_graph   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 68
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 7
eloc 45
dl 0
loc 68
rs 10
c 0
b 0
f 0

3 Functions

Rating   Name   Duplication   Size   Complexity  
A obstruction_predicate() 0 8 1
A sliding_window_visibility_graph() 0 6 2
A visibility_graph() 0 19 4
1
import itertools
2
from collections.abc import Iterable
3
from typing import Optional
4
5
import more_itertools
6
import networkx as nx
7
from matplotlib import pyplot
8
9
10
def sliding_window_visibility_graph(series: Iterable[float], window_size: Optional[int] = None):
11
    if window_size:
12
        yield from itertools.chain(
13
            visibility_graph(subseries) for subseries in more_itertools.sliding_window(series, window_size))
14
    else:
15
        yield visibility_graph(series)
16
17
18
def obstruction_predicate(n1, t1, n2, t2):
19
    slope = (t2 - t1) / (n2 - n1)
20
    constant = t2 - slope * n2
21
22
    def is_obstruction(n, t):
23
        return t >= constant + slope * n
24
25
    return is_obstruction
26
27
28
def visibility_graph(series: Iterable[float]):
29
    graph = nx.Graph()
30
    # Check all combinations of nodes n series
31
32
    for s1, s2 in itertools.combinations(enumerate(series), 2):
33
        n1, t1 = s1
34
        n2, t2 = s2
35
36
        if n2 == n1 + 1:
37
            graph.add_node(n1, value=t1)
38
            graph.add_node(n2, value=t2)
39
            graph.add_edge(n1, n2)
40
        else:
41
            is_obstruction = obstruction_predicate(n1, t1, n2, t2)
42
            obstructed = any(is_obstruction(n, t) for n, t in enumerate(series) if n1 < n < n2)
43
            if not obstructed:
44
                graph.add_edge(n1, n2)
45
46
    return graph
47
48
49
if __name__ == '__main__':
50
    series_list = [
51
        # range(10),
52
        # [2, 1, 3, 2, 1, 3, 2, 1, 3, 2, 1, 3, 4],
53
        [-(x ** 3) for x in range(-10, 11)],
54
        # [2, 1, 3, 4, 2, 1, 4, 3, 1, 1, 3, 4, 2, 4, 1, 3],
55
        # random.sample(range(1000), 500)
56
    ]
57
58
    for s in series_list:
59
        g = visibility_graph(s)
60
        print(g)
61
62
        pos = [[x, 0] for x in range(len(s))]
63
        labels = nx.get_node_attributes(g, 'value')
64
        nx.draw_networkx_nodes(g, pos)
65
        nx.draw_networkx_labels(g, pos, labels=labels)
66
        nx.draw_networkx_edges(g, pos, arrows=True, connectionstyle='arc3,rad=-1.57079632679')
67
        pyplot.show()
68