Passed
Push — main ( 2e1b6b...3a0c28 )
by Douglas
02:06
created

mandos.entry.tools.multi_searches.CmdRunner.key()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
Runner.
3
"""
4
5
from __future__ import annotations
6
7
from dataclasses import dataclass
8
from datetime import datetime
9
from pathlib import Path
10
from typing import Any, Mapping, MutableMapping, Optional, Sequence, Type, Union
11
12
import pandas as pd
0 ignored issues
show
introduced by
Unable to import 'pandas'
Loading history...
13
import typer
0 ignored issues
show
introduced by
Unable to import 'typer'
Loading history...
14
from pocketutils.core.exceptions import InjectionError, PathExistsError
0 ignored issues
show
introduced by
Unable to import 'pocketutils.core.exceptions'
Loading history...
15
from typeddfs import Checksums, TypedDfs
0 ignored issues
show
introduced by
Unable to import 'typeddfs'
Loading history...
16
from typeddfs.abs_dfs import AbsDf
0 ignored issues
show
introduced by
Unable to import 'typeddfs.abs_dfs'
Loading history...
17
18
from mandos.entry.abstract_entries import Entry
19
from mandos.entry.api_singletons import Apis
0 ignored issues
show
Unused Code introduced by
Unused Apis imported from mandos.entry.api_singletons
Loading history...
20
from mandos.entry.entry_commands import Entries
21
from mandos.entry.utils._arg_utils import EntryUtils
22
from mandos.model.hit_dfs import HitDf
23
from mandos.model.settings import SETTINGS
24
from mandos.model.utils.setup import LOG_SETUP, logger
25
26
cli = typer.Typer()
27
28
EntriesByCmd: MutableMapping[str, Type[Entry]] = {e.cmd(): e for e in Entries}
29
30
# these are not permitted in individual searches
31
forbidden_keys = {"to", "stderr", "log", "replace", "proceed"}
32
33
SearchExplainDf = (
34
    TypedDfs.typed("SearchExplainDf")
35
    .require("key", "search", "source", dtype=str)
36
    .require("desc", "args", dtype=str)
37
    .strict()
38
    .secure()
39
).build()
40
41
42
def _no_duplicate_keys(self: AbsDf) -> Optional[str]:
43
    group = self[["key"]].groupby("key").count().to_dict()
44
    bad = {k for k, v in group.items() if v > 1}
45
    if len(bad) > 0:
46
        return f"Duplicate keys: {', '.join(bad)}"
47
    return None
48
49
50
def _no_illegal_cols(self: AbsDf) -> Optional[str]:
51
    illegal = {c for c in ["to", "path"] if c in self.columns}
52
    if len(illegal) > 0:
53
        return f"Illegal keys {', '.join(illegal)}"
54
    return None
55
56
57
SearchConfigDf = (
58
    TypedDfs.typed("SearchConfigDf")
59
    .require("key", "source", dtype=str)
60
    .verify(_no_duplicate_keys)
61
    .verify(_no_illegal_cols)
62
    .add_read_kwargs("toml", aot="search")
63
    .add_write_kwargs("toml", aot="search")
64
    .secure()
65
    .build()
66
)
67
68
69
@dataclass(frozen=True, repr=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
70
class MultiSearch:
71
    config: SearchConfigDf
72
    input_path: Path
73
    out_dir: Path
74
    suffix: str
75
    restart: bool
76
    proceed: bool
77
    log_path: Optional[Path]
78
79
    @property
80
    def final_path(self) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
81
        name = "search_" + self.input_path.name + self.suffix
82
        return self.out_dir / name
83
84
    @property
85
    def is_complete(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
86
        return Checksums().get_filesum_of_file(self.final_path).exists()
87
88
    @property
89
    def doc_path(self) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
90
        return Path(str(self.final_path.with_suffix("")) + ".doc.tsv")
91
92
    def test(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
93
        self._build_and_test()
94
95
    def run(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
96
        if self.final_path.exists():
97
            raise PathExistsError(f"{self.final_path} exists")
98
        commands = self._build_and_test()
99
        # start!
100
        for cmd in commands:
101
            cmd.run()
102
        logger.notice("Done with all searches!")
103
        self._write_final(commands)
104
105
    def _write_final(self, commands: Sequence[CmdRunner]):
106
        # write the final file
107
        df = HitDf(pd.concat([HitDf.read_file(cmd.output_path) for cmd in commands]))
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
108
        now = datetime.now().isoformat(timespec="milliseconds")
109
        docs = self.get_docs(commands)
110
        SearchExplainDf([pd.Series(x) for x in docs]).write_file(self.doc_path)
111
        df = df.set_attrs(commands=docs, written=now)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "df" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
112
        df.write_file(
113
            self.final_path.resolve(),
114
            dir_hash=True,
115
            file_hash=True,
116
            attrs=True,
117
            overwrite=self.restart,
118
        )
119
        logger.notice(f"Concatenated results to {self.final_path}")
120
121
    def _build_and_test(self) -> Sequence[CmdRunner]:
122
        # build up the list of Entry classes first, and run ``test`` on each one
123
        # that's to check that the parameters are correct before running anything
124
        logger.info("Building commands...")
125
        commands = self._build_commands()
126
        if len(commands) == 0:
127
            logger.warning(f"No searches — nothing to do")
0 ignored issues
show
introduced by
Using an f-string that does not have any interpolated variables
Loading history...
128
            return []
129
        # build and test
130
        for cmd in commands:
131
            try:
132
                logger.info(f"Testing {cmd.key} ({cmd.cmd.__name__})")
133
                cmd.test()
134
            except Exception:
135
                logger.error(f"Bad search {cmd}")
136
                raise
137
        logger.success("Searches look ok")
138
        return commands
139
140
    def _build_commands(self) -> Sequence[CmdRunner]:
141
        commands = {}
142
        for i in range(len(self.config)):
143
            data = {
144
                k: v
145
                for k, v in self.config.iloc[i].to_dict().items()
146
                if v is not None and not pd.isna(v)
147
            }
148
            cmd = self._build_command(data)
149
            if cmd is not None:
150
                commands[cmd.key] = cmd
151
        return list(commands.values())
152
153
    def _build_command(self, data):
154
        key = data["key"]
155
        with logger.contextualize(key=key):
156
            default_to = self.out_dir / (key + SETTINGS.table_suffix)
157
            # not actually replacing -- we're just pretending so we can call adjust_filename
158
            data["to"] = EntryUtils.adjust_filename(
159
                None, default=default_to, replace=True, quiet=True
160
            )
161
            data["log"] = self._get_log_path(key)
162
            data["stderr"] = None  # MANDOS_SETUP.main.level
163
            cmd = CmdRunner.build(data, self.input_path, restart=self.restart, proceed=self.proceed)
164
        return cmd
165
166
    def get_docs(self, commands: Sequence[CmdRunner]) -> Sequence[Mapping[str, Any]]:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
167
        rows = []
168
        for cmd in commands:
169
            st = cmd.cmd.get_search_type()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "st" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
170
            name = st.search_name()
171
            src = st.primary_data_source()
172
            desc = cmd.cmd.describe()
173
            args = " ".join([f'{k}="{v}"' for k, v in cmd.params.items()])
174
            ser = dict(key=cmd.key, search=name, source=src, desc=desc, args=args)
175
            rows.append(ser)
176
        return rows
177
178
    def _get_log_path(self, key: str) -> Path:
179
        if self.log_path is None:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
180
            suffix = SETTINGS.log_suffix
181
            return self.out_dir / (key + suffix)
182
        else:
183
            suffix = LOG_SETUP.guess_file_sink_info(self.log_path).suffix
184
            log_base = self.log_path.name[: -len(suffix)]
185
            return self.log_path.parent / (log_base + "_" + key + suffix)
186
187
188
@dataclass(frozen=True, repr=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
189
class CmdRunner:
190
    cmd: Type[Entry]
191
    params: MutableMapping[str, Union[int, str, float]]
192
    input_path: Path
193
194
    @property
195
    def key(self) -> str:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
196
        return self.params["key"]
197
198
    @property
199
    def output_path(self) -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
200
        return Path(self.params["to"])
201
202
    def test(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
203
        with logger.contextualize(key=self.key):
204
            self.cmd.test(self.input_path, **self.params)
205
206
    def run(self) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
207
        with logger.contextualize(key=self.key):
208
            self.cmd.run(self.input_path, **self.params)
209
210
    @classmethod
211
    def build(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
212
        cls, data: Mapping[str, Any], input_path: Path, *, restart: bool, proceed: bool
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
213
    ) -> CmdRunner:
214
        key, cmd = data["key"], data["source"]
215
        try:
216
            cmd = EntriesByCmd[cmd]
217
        except KeyError:
218
            raise InjectionError(f"Search command {cmd} (key {key}) does not exist") from None
219
        # we need to explicitly add the defaults from the OptionInfo instances
220
        # add our new stuff after that
221
        params = {
222
            **cmd.default_param_values(),
223
            **dict(replace=restart, proceed=proceed),
224
            **{k: v for k, v in data.items() if k != "source"},
225
        }
226
        return CmdRunner(cmd, params, input_path)
227
228
229
__all__ = ["MultiSearch", "SearchExplainDf", "SearchConfigDf"]
230