menderbot.source_file.SourceFile.is_unicode()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import itertools
2
import os
3
import tempfile
4
from dataclasses import dataclass
5
from pathlib import Path
6
from typing import Iterable
7
8
import rich_click as click
9
from charset_normalizer import from_path
10
11
12
@dataclass
13
class Insertion:
14
    text: str
15
    line_number: int
16
    label: str
17
    col: int = -1  # Use with `inline`
18
    inline: bool = False  # Insert into existing line instead of adding new line
19
20
21
def partition(pred, iterable):
22
    "Use a predicate to partition entries into false entries and true entries"
23
    # partition(is_odd, range(10)) --> 0 2 4 6 8   and  1 3 5 7 9
24
    t1, t2 = itertools.tee(iterable)
25
    return itertools.filterfalse(pred, t1), filter(pred, t2)
26
27
28
def insert_in_lines(lines: Iterable[str], insertions: Iterable[Insertion]):
29
    # TODO: verbose flag
30
    # print("INSERTION", insertions)
31
    lines = iter(lines)
32
    last_line = 1
33
    insertion_groups = itertools.groupby(insertions, key=lambda ins: ins.line_number)
34
    for line_number, insertion_group in insertion_groups:
35
        for line in itertools.islice(lines, line_number - last_line):
36
            yield line
37
            last_line += 1
38
        full_insertions, inline_insertions = partition(
39
            lambda ins: ins.inline, insertion_group
40
        )
41
        for insertion in full_insertions:
42
            yield insertion.text + "\n"
43
44
        line_to_edit = None
45
        col_offset = 0
46
        for insertion in inline_insertions:
47
            if not line_to_edit:
48
                line_to_edit = next(lines, "")
49
            # insertion.col is 1-indexed, col is zero indexed
50
            col = insertion.col + col_offset - 1
51
            col_offset += len(insertion.text)
52
            line_to_edit = line_to_edit[:col] + insertion.text + line_to_edit[col:]
53
        if line_to_edit:
54
            yield line_to_edit
55
            last_line += 1
56
    yield from lines
57
58
59
class SourceFile:
60
    def __init__(self, path: str):
61
        self.path = path
62
        self.encoding = None
63
        self._initial_modified_time = os.path.getmtime(path)
64
65
    def load_source_as_utf8(self):
66
        loaded = from_path(self.path)
67
        best_guess = loaded.best()
68
        self.encoding = best_guess.encoding
69
        return best_guess.output(encoding="utf_8")
70
71
    def is_unicode(self):
72
        return self.encoding.startswith("utf")
73
74
    def update_file(self, insertions: Iterable[Insertion], suffix: str) -> None:
75
        path_obj = Path(self.path)
76
        with path_obj.open("r", encoding=self.encoding) as filehandle:
77
            if self.modified_after_loaded():
78
                raise click.FileError(
79
                    self.path, "File was externally modified, try again."
80
                )
81
            new_lines = list(insert_in_lines(lines=filehandle, insertions=insertions))
82
            out_file = path_obj.with_suffix(f"{path_obj.suffix}{suffix}")
83
            self._write_result(new_lines, out_file)
84
85
    def _write_result(self, lines: list, output_file: Path) -> None:
86
        with tempfile.TemporaryDirectory() as tempdir:
87
            my_tempfile: Path = Path(tempdir) / "output.txt"
88
            with my_tempfile.open("w") as filehandle:
89
                for line in lines:
90
                    filehandle.write(line)
91
            my_tempfile.replace(output_file)
92
93
    def modified_after_loaded(self) -> bool:
94
        return os.path.getmtime(self.path) > self._initial_modified_time
95