Passed
Push — main ( 65730f...fad324 )
by Douglas
06:54 queued 02:27
created

MultiSearch._build_and_test()   A

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 16
nop 1
dl 0
loc 20
rs 9.6
c 0
b 0
f 0
1
"""
2
Runner.
3
"""
4
5
from __future__ import annotations
6
7
from dataclasses import dataclass
8
from datetime import datetime
9
from pathlib import Path
10
from typing import Any, Mapping, MutableMapping, Optional, Sequence, Type, Union
11
12
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
13
import typer
0 ignored issues
show
introduced by
Unable to import 'typer'
Loading history...
14
from pocketutils.core.exceptions import InjectionError
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.exceptions'
Loading history...
15
from typeddfs import Checksums, TypedDfs
0 ignored issues
show
introduced by
Unable to import 'typeddfs'
Loading history...
16
from typeddfs.abs_dfs import AbsDf
0 ignored issues
show
introduced by
Unable to import 'typeddfs.abs_dfs'
Loading history...
17
18
from mandos.entry.abstract_entries import Entry
19
from mandos.entry.api_singletons import Apis
0 ignored issues
show
Unused Code introduced by
Unused Apis imported from mandos.entry.api_singletons
Loading history...
20
from mandos.entry.entry_commands import Entries
21
from mandos.entry.utils._arg_utils import EntryUtils
22
from mandos.model.hit_dfs import HitDf
23
from mandos.model.settings import SETTINGS
24
from mandos.model.utils.setup import LOG_SETUP, logger
25
26
cli = typer.Typer()
27
28
EntriesByCmd: MutableMapping[str, Type[Entry]] = {e.cmd(): e for e in Entries}
29
30
# these are not permitted in individual searches
31
forbidden_keys = {"to", "stderr", "log", "replace", "proceed"}
32
33
SearchExplainDf = (
34
    TypedDfs.typed("SearchExplainDf")
35
    .require("key", "search", "source", dtype=str)
36
    .require("desc", "args", dtype=str)
37
    .strict()
38
    .secure()
39
).build()
40
41
42
def _no_duplicate_keys(self: AbsDf) -> Optional[str]:
43
    group = self[["key"]].groupby("key").count().to_dict()
44
    bad = {k for k, v in group.items() if v > 1}
45
    if len(bad) > 0:
46
        return f"Duplicate keys: {', '.join(bad)}"
47
    return None
48
49
50
def _no_illegal_cols(self: AbsDf) -> Optional[str]:
51
    illegal = {c for c in ["to", "path"] if c in self.columns}
52
    if len(illegal) > 0:
53
        return f"Illegal keys {', '.join(illegal)}"
54
    return None
55
56
57
SearchConfigDf = (
58
    TypedDfs.typed("SearchConfigDf")
59
    .require("key", "source", dtype=str)
60
    .verify(_no_duplicate_keys)
61
    .verify(_no_illegal_cols)
62
    .add_read_kwargs("toml", aot="search")
63
    .add_write_kwargs("toml", aot="search")
64
    .secure()
65
    .build()
66
)
67
68
69
@dataclass(frozen=True, repr=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
70
class MultiSearch:
71
    config: SearchConfigDf
72
    input_path: Path
73
    out_dir: Path
74
    suffix: str
75
    restart: bool
76
    proceed: bool
77
    log_path: Optional[Path]
78
79
    @property
80
    def final_path(self) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
81
        name = "search_" + self.input_path.name + self.suffix
82
        return self.out_dir / name
83
84
    @property
85
    def is_complete(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
86
        return Checksums().get_filesum_of_file(self.final_path).exists()
87
88
    @property
89
    def doc_path(self) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
90
        return Path(str(self.final_path.with_suffix("")) + "_doc.tsv")
91
92
    def test(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
93
        self._build_and_test()
94
95
    def run(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
96
        commands = self._build_and_test()
97
        # start!
98
        for cmd in commands:
99
            cmd.run()
100
        logger.notice("Done with all searches!")
101
        self._write_final(commands)
102
103
    def _write_final(self, commands: Sequence[CmdRunner]):
104
        # write the final file
105
        df = HitDf(pd.concat([HitDf.read_file(cmd.output_path) for cmd in commands]))
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
106
        now = datetime.now().isoformat(timespec="milliseconds")
107
        df = df.set_attrs(keys=[c.key for c in commands], written=now)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
108
        df.write_file(self.final_path, dir_hash=True, file_hash=True, attrs=True)
109
        logger.notice(f"Concatenated results to {self.final_path}")
110
111
    def _build_and_test(self) -> Sequence[CmdRunner]:
112
        # build up the list of Entry classes first, and run ``test`` on each one
113
        # that's to check that the parameters are correct before running anything
114
        logger.info("Building commands...")
115
        commands = self._build_commands()
116
        if len(commands) == 0:
117
            logger.warning(f"No searches — nothing to do")
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
118
            return []
119
        # write a file describing all of the searches
120
        self.write_docs(commands)
121
        # build and test
122
        for cmd in commands:
123
            try:
124
                logger.info(f"Testing {cmd.key} ({cmd.cmd.__name__})")
125
                cmd.test()
126
            except Exception:
127
                logger.error(f"Bad search {cmd}")
128
                raise
129
        logger.success("Searches look ok")
130
        return commands
131
132
    def _build_commands(self) -> Sequence[CmdRunner]:
133
        commands = {}
134
        for i in range(len(self.config)):
135
            data = {
136
                k: v
137
                for k, v in self.config.iloc[i].to_dict().items()
138
                if v is not None and not pd.isna(v)
139
            }
140
            cmd = self._build_command(data)
141
            if cmd is not None:
142
                commands[cmd.key] = cmd
143
        return list(commands.values())
144
145
    def _build_command(self, data):
146
        key = data["key"]
147
        with logger.contextualize(key=key):
148
            default_to = self.out_dir / (key + SETTINGS.table_suffix)
149
            # not actually replacing -- we're just pretending so we can call adjust_filename
150
            data["to"] = EntryUtils.adjust_filename(
151
                None, default=default_to, replace=True, quiet=True
152
            )
153
            data["log"] = self._get_log_path(key)
154
            data["stderr"] = None  # MANDOS_SETUP.main.level
155
            cmd = CmdRunner.build(data, self.input_path, restart=self.restart, proceed=self.proceed)
156
        return cmd
157
158
    def write_docs(self, commands: Sequence[CmdRunner]) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
159
        rows = []
160
        for cmd in commands:
161
            st = cmd.cmd.get_search_type()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "st" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
162
            name = st.search_name()
163
            src = st.primary_data_source()
164
            desc = cmd.cmd.describe()
165
            args = " ".join([f'{k}="{v}"' for k, v in cmd.params.items()])
166
            ser = dict(key=cmd.key, search=name, source=src, desc=desc, args=args)
167
            rows.append(pd.Series(ser))
168
        SearchExplainDf(rows).write_file(self.doc_path, mkdirs=True)
169
170
    def _get_log_path(self, key: str) -> Path:
171
        if self.log_path is None:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
172
            suffix = SETTINGS.log_suffix
173
            return self.out_dir / (key + suffix)
174
        else:
175
            suffix = LOG_SETUP.guess_file_sink_info(self.log_path).suffix
176
            log_base = self.log_path.name[: -len(suffix)]
177
            return self.log_path.parent / (log_base + "_" + key + suffix)
178
179
180
@dataclass(frozen=True, repr=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
181
class CmdRunner:
182
    cmd: Type[Entry]
183
    params: MutableMapping[str, Union[int, str, float]]
184
    input_path: Path
185
186
    @property
187
    def key(self) -> str:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
188
        return self.params["key"]
189
190
    @property
191
    def output_path(self) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
192
        return Path(self.params["to"])
193
194
    def test(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
195
        with logger.contextualize(key=self.key):
196
            self.cmd.test(self.input_path, **self.params)
197
198
    def run(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
199
        with logger.contextualize(key=self.key):
200
            self.cmd.run(self.input_path, **self.params)
201
202
    @classmethod
203
    def build(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
204
        cls, data: Mapping[str, Any], input_path: Path, *, restart: bool, proceed: bool
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
205
    ) -> CmdRunner:
206
        key, cmd = data["key"], data["source"]
207
        try:
208
            cmd = EntriesByCmd[cmd]
209
        except KeyError:
210
            raise InjectionError(f"Search command {cmd} (key {key}) does not exist") from None
211
        # we need to explicitly add the defaults from the OptionInfo instances
212
        # add our new stuff after that
213
        params = {
214
            **cmd.default_param_values(),
215
            **dict(replace=restart, proceed=proceed),
216
            **{k: v for k, v in data.items() if k != "source"},
217
        }
218
        return CmdRunner(cmd, params, input_path)
219
220
221
__all__ = ["MultiSearch", "SearchExplainDf", "SearchConfigDf"]
222