generate_sankey_diagram_data.update_counts()   A
last analyzed

Complexity

Conditions 5

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 10
nop 3
dl 0
loc 10
rs 9.3333
c 0
b 0
f 0
1
from ast import literal_eval
2
from collections import defaultdict
3
from typing import Any
4
5
import pandas as pd
6
7
# future: use plotly.graph_objects instead of Flourish
8
9
10
def parse_markdown_list(markdown: str) -> defaultdict[Any, dict[str, int | defaultdict]]:
11
    lines = markdown.strip().split("\n")
12
    stack: list[dict[str, int] | defaultdict] = []
13
    root: defaultdict[Any, dict[str, int | defaultdict]] = defaultdict(lambda: {"count": 0, "children": defaultdict()})
14
    lines = skip_frontmatter(lines)
15
    
16
    for line in lines:
17
        indent_level = (len(line) - len(line.lstrip())) // 4
18
        item = line.strip("- ").strip()
19
        while len(stack) > indent_level:
20
            stack.pop()
21
22
        if stack:
23
            current = stack[-1]["children"]
24
        else:
25
            current = root
26
27
        if item not in current:
28
            current[item] = {"count": 0, "children": defaultdict()}
29
        current[item]["count"] += 1
30
        stack.append(current[item])
31
    return root
32
33
34
def skip_frontmatter(lines: list[str]) -> list[str]:
35
    if lines[0].startswith("---"):
36
        end_index = 0
37
        for i, line in enumerate(lines):
38
            if line.startswith("---") and i != 0:
39
                end_index = i + 1
40
                break
41
        lines = lines[end_index:]
42
    return lines
43
44
45
def update_counts(category: str, structure: dict, path: list) -> bool:
46
    if category in structure:
47
        structure[category]["count"] += 1
48
        for parent in path:
49
            parent["count"] += 1
50
        return True
51
    for key, value in structure.items():
52
        if update_counts(category, value["children"], path + [value]):
53
            return True
54
    return False
55
56
57
def update_values_from_csv(structure: dict, csv_data: pd.DataFrame) -> None:
58
    for categories in csv_data.iterrows():
59
        category_list = literal_eval(categories[1][0])
60
        for category in category_list:
61
            update_counts(category, structure, [])
62
63
64
def generate_sankey_data(structure: dict, step: int = 0, parent: str | None = None) -> list:
65
    data = []
66
    for key, value in structure.items():
67
        if parent is not None:
68
            data.append(
69
                {
70
                    "Source": parent,
71
                    "Dest": key,
72
                    "Value": value["count"],
73
                    "Step from": step,
74
                    "Step to": step + 1,
75
                }
76
            )
77
        if value["children"]:
78
            data.extend(generate_sankey_data(value["children"], step + 1, key))
79
    return data
80
81
82
def prune_sankey_data(data: list, max_depth: int) -> list:
83
    return [row for row in data if row["Step to"] <= max_depth]
84
85
86
def main(markdown_file_path: str, csv_file_path: str, max_depth: int) -> None:
87
    with open(markdown_file_path, "r") as file:
88
        markdown_list = file.read()
89
    parsed_structure = parse_markdown_list(markdown_list)
90
91
    csv_df = pd.read_csv(csv_file_path, header=None)
92
    update_values_from_csv(parsed_structure, csv_df)
93
    sankey_data = generate_sankey_data(parsed_structure)
94
    pruned_sankey_data = prune_sankey_data(sankey_data, max_depth)
95
    df = pd.DataFrame(
96
        pruned_sankey_data, columns=["Source", "Dest", "Value", "Step from", "Step to"]
97
    )
98
    csv_output = df.to_csv(index=False)
99
    with open("/Users/gymate1/Downloads/sankey_data.csv", "w") as file:
100
        file.write(csv_output)
101
102
103
if __name__ == "__main__":
104
    main(
105
        markdown_file_path="/Users/gymate1/Desktop/drop/python/kalauz/mindmap/SR_cause_categories.md",
106
        csv_file_path="/Users/gymate1/Downloads/kalauz_speed_restrictions.csv",
107
        max_depth=4,
108
    )
109