annif.project.AnnifProject._initialize_analyzer()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""Project management functionality for Annif"""
2
3
from __future__ import annotations
4
5
import enum
6
import os.path
7
import re
8
from shutil import rmtree
9
from typing import TYPE_CHECKING
10
11
import annif
12
import annif.analyzer
13
import annif.backend
14
import annif.corpus
15
import annif.transform
16
from annif.corpus import Document
17
from annif.datadir import DatadirMixin
18
from annif.exception import (
19
    AnnifException,
20
    ConfigurationException,
21
    NotEnabledException,
22
    NotInitializedException,
23
    NotSupportedException,
24
)
25
from annif.util import parse_args
26
from annif.vocab import SubjectIndexFilter, kwargs_to_exclude_uris
27
28
if TYPE_CHECKING:
29
    from collections import defaultdict
30
    from configparser import SectionProxy
31
    from datetime import datetime
32
33
    from click.utils import LazyFile
34
35
    from annif.analyzer import Analyzer
36
    from annif.backend import AnnifBackend
37
    from annif.backend.hyperopt import HPRecommendation
38
    from annif.corpus import DocumentCorpus
39
    from annif.corpus.subject import SubjectIndex
40
    from annif.registry import AnnifRegistry
41
    from annif.transform.transform import TransformChain
42
    from annif.vocab import AnnifVocabulary
43
44
logger = annif.logger
45
46
47
class Access(enum.IntEnum):
48
    """Enumeration of access levels for projects"""
49
50
    private = 1
51
    hidden = 2
52
    public = 3
53
54
55
class AnnifProject(DatadirMixin):
56
    """Class representing the configuration of a single Annif project."""
57
58
    # defaults for uninitialized instances
59
    _transform = None
60
    _analyzer = None
61
    _backend = None
62
    _vocab = None
63
    _vocab_lang = None
64
    _vocab_kwargs = {}
65
    _subject_index = None
66
    initialized = False
67
68
    # default values for configuration settings
69
    DEFAULT_ACCESS = "public"
70
71
    def __init__(
72
        self,
73
        project_id: str,
74
        config: dict[str, str] | SectionProxy,
75
        datadir: str,
76
        registry: AnnifRegistry,
77
    ) -> None:
78
        DatadirMixin.__init__(self, datadir, "projects", project_id)
79
        self.project_id = project_id
80
        self.name = config.get("name", project_id)
81
        self.language = config["language"]
82
        self.analyzer_spec = config.get("analyzer", None)
83
        self.transform_spec = config.get("transform", "pass")
84
        self.vocab_spec = config.get("vocab", None)
85
        self.config = config
86
        self._base_datadir = datadir
87
        self.registry = registry
88
        self._init_access()
89
90
    def _init_access(self) -> None:
91
        access = self.config.get("access", self.DEFAULT_ACCESS)
92
        try:
93
            self.access = getattr(Access, access)
94
        except AttributeError:
95
            raise ConfigurationException(
96
                "'{}' is not a valid access setting".format(access),
97
                project_id=self.project_id,
98
            )
99
100
    def _initialize_analyzer(self) -> None:
101
        if not self.analyzer_spec:
102
            return  # not configured, so assume it's not needed
103
        analyzer = self.analyzer
104
        logger.debug(
105
            "Project '%s': initialized analyzer: %s", self.project_id, str(analyzer)
106
        )
107
108
    def _initialize_subjects(self) -> None:
109
        try:
110
            subjects = self.subjects
111
            logger.debug(
112
                "Project '%s': initialized subjects: %s", self.project_id, str(subjects)
113
            )
114
        except AnnifException as err:
115
            logger.warning(err.format_message())
116
117
    def _initialize_backend(self, parallel: bool) -> None:
118
        logger.debug("Project '%s': initializing backend", self.project_id)
119
        try:
120
            if not self.backend:
121
                logger.debug("Cannot initialize backend: does not exist")
122
                return
123
            self.backend.initialize(parallel)
124
        except AnnifException as err:
125
            logger.warning(err.format_message())
126
127
    def initialize(self, parallel: bool = False) -> None:
128
        """Initialize this project and its backend so that they are ready to
129
        be used. If parallel is True, expect that the project will be used
130
        for parallel processing."""
131
132
        if self.initialized:
133
            return
134
135
        logger.debug("Initializing project '%s'", self.project_id)
136
137
        self._initialize_analyzer()
138
        self._initialize_subjects()
139
        self._initialize_backend(parallel)
140
141
        self.initialized = True
142
143
    def _suggest_with_backend(
144
        self,
145
        docs: list[Document],
146
        backend_params: defaultdict[str, dict] | None,
147
    ) -> annif.suggestion.SuggestionBatch:
148
        if backend_params is None:
149
            backend_params = {}
150
        beparams = backend_params.get(self.backend.backend_id, {})
151
        return self.backend.suggest(docs, beparams)
152
153
    @property
154
    def analyzer(self) -> Analyzer:
155
        if self._analyzer is None:
156
            if self.analyzer_spec:
157
                self._analyzer = annif.analyzer.get_analyzer(self.analyzer_spec)
158
            else:
159
                raise ConfigurationException(
160
                    "analyzer setting is missing", project_id=self.project_id
161
                )
162
        return self._analyzer
163
164
    @property
165
    def transform(self) -> TransformChain:
166
        if self._transform is None:
167
            self._transform = annif.transform.get_transform(
168
                self.transform_spec, project=self
169
            )
170
        return self._transform
171
172
    @property
173
    def backend(self) -> AnnifBackend | None:
174
        if self._backend is None:
175
            if "backend" not in self.config:
176
                raise ConfigurationException(
177
                    "backend setting is missing", project_id=self.project_id
178
                )
179
            backend_id = self.config["backend"]
180
            try:
181
                backend_class = annif.backend.get_backend(backend_id)
182
                self._backend = backend_class(
183
                    backend_id, config_params=self.config, project=self
184
                )
185
            except ValueError:
186
                logger.warning(
187
                    "Could not create backend %s, "
188
                    "make sure you've installed optional dependencies",
189
                    backend_id,
190
                )
191
        return self._backend
192
193
    def _initialize_vocab(self) -> None:
194
        if self.vocab_spec is None:
195
            raise ConfigurationException(
196
                "vocab setting is missing", project_id=self.project_id
197
            )
198
199
        match = re.match(r"([\w-]+)(\((.*)\))?$", self.vocab_spec)
200
        if match is None:
201
            raise ValueError(f"Invalid vocabulary specification: {self.vocab_spec}")
202
        vocab_id = match.group(1)
203
        posargs, self._vocab_kwargs = parse_args(match.group(3))
204
        self._vocab_lang = posargs[0] if posargs else self.language
205
        self._vocab = self.registry.get_vocab(vocab_id)
206
207
    @property
208
    def vocab(self) -> AnnifVocabulary:
209
        if self._vocab is None:
210
            self._initialize_vocab()
211
        return self._vocab
212
213
    @property
214
    def vocab_lang(self) -> str:
215
        if self._vocab_lang is None:
216
            self._initialize_vocab()
217
        return self._vocab_lang
218
219
    @property
220
    def subjects(self) -> SubjectIndex:
221
        if self._subject_index is None:
222
            self._subject_index = self.vocab.subjects
223
            exclude_uris = kwargs_to_exclude_uris(self.vocab, self._vocab_kwargs)
224
            if exclude_uris:
225
                self._subject_index = SubjectIndexFilter(
226
                    self._subject_index, exclude=exclude_uris
227
                )
228
        return self._subject_index
229
230
    def _get_info(self, key: str) -> bool | datetime | None:
231
        try:
232
            be = self.backend
233
            if be is not None:
234
                return getattr(be, key)
235
        except AnnifException as err:
236
            logger.warning(err.format_message())
237
            return None
238
239
    @property
240
    def is_trained(self) -> bool | None:
241
        return self._get_info("is_trained")
242
243
    @property
244
    def modification_time(self) -> datetime | None:
245
        return self._get_info("modification_time")
246
247
    def suggest_corpus(
248
        self,
249
        corpus: DocumentCorpus,
250
        backend_params: defaultdict[str, dict] | None = None,
251
    ) -> annif.suggestion.SuggestionResults:
252
        """Suggest subjects for the given documents corpus in batches of documents."""
253
        suggestions = (
254
            self.suggest(doc_batch, backend_params) for doc_batch in corpus.doc_batches
255
        )
256
        import annif.suggestion
257
258
        return annif.suggestion.SuggestionResults(suggestions)
259
260
    def suggest(
261
        self,
262
        documents: list[Document],
263
        backend_params: defaultdict[str, dict] | None = None,
264
    ) -> annif.suggestion.SuggestionBatch:
265
        """Suggest subjects for the given documents batch."""
266
        if not self.is_trained:
267
            if self.is_trained is None:
268
                logger.warning("Could not get train state information.")
269
            else:
270
                raise NotInitializedException("Project is not trained.")
271
        transformed_docs = [self.transform.transform_doc(doc) for doc in documents]
272
        return self._suggest_with_backend(transformed_docs, backend_params)
273
274
    def train(
275
        self,
276
        corpus: DocumentCorpus,
277
        backend_params: defaultdict[str, dict] | None = None,
278
        jobs: int = 0,
279
    ) -> None:
280
        """train the project using documents from a metadata source"""
281
        if corpus != "cached":
282
            corpus = self.transform.transform_corpus(corpus)
283
        if backend_params is None:
284
            backend_params = {}
285
        beparams = backend_params.get(self.backend.backend_id, {})
286
        self.backend.train(corpus, beparams, jobs)
287
288
    def learn(
289
        self,
290
        corpus: DocumentCorpus,
291
        backend_params: defaultdict[str, dict] | None = None,
292
    ) -> None:
293
        """further train the project using documents from a metadata source"""
294
        if backend_params is None:
295
            backend_params = {}
296
        beparams = backend_params.get(self.backend.backend_id, {})
297
        corpus = self.transform.transform_corpus(corpus)
298
        if isinstance(self.backend, annif.backend.backend.AnnifLearningBackend):
299
            if annif.util.boolean(self.config.get("allow_learn", False)):
300
                self.backend.learn(corpus, beparams)
301
            else:
302
                raise NotEnabledException(
303
                    "Learning not enabled for project", project_id=self.project_id
304
                )
305
        else:
306
            raise NotSupportedException(
307
                "Learning not supported by backend", project_id=self.project_id
308
            )
309
310
    def hyperopt(
311
        self,
312
        corpus: DocumentCorpus,
313
        trials: int,
314
        jobs: int,
315
        metric: str,
316
        results_file: LazyFile | None,
317
    ) -> HPRecommendation:
318
        """optimize the hyperparameters of the project using a validation
319
        corpus against a given metric"""
320
        if isinstance(self.backend, annif.backend.hyperopt.AnnifHyperoptBackend):
321
            optimizer = self.backend.get_hp_optimizer(corpus, metric)
322
            return optimizer.optimize(trials, jobs, results_file)
323
324
        raise NotSupportedException(
325
            "Hyperparameter optimization not supported " "by backend",
326
            project_id=self.project_id,
327
        )
328
329
    def dump(self) -> dict[str, str | dict | bool | datetime | None]:
330
        """return this project as a dict"""
331
332
        try:
333
            vocab = {
334
                "vocab_id": self.vocab.vocab_id,
335
                "languages": sorted(self.vocab.languages),
336
            }
337
            vocab_lang = self.vocab_lang
338
        except ConfigurationException:
339
            vocab = None
340
            vocab_lang = None
341
342
        return {
343
            "project_id": self.project_id,
344
            "name": self.name,
345
            "language": self.language,
346
            "backend": {"backend_id": self.config.get("backend")},
347
            "vocab": vocab,
348
            "vocab_language": vocab_lang,
349
            "is_trained": self.is_trained,
350
            "modification_time": self.modification_time,
351
        }
352
353
    def remove_model_data(self) -> None:
354
        """remove the data of this project"""
355
        datadir_path = self._datadir_path
356
        if os.path.isdir(datadir_path):
357
            rmtree(datadir_path)
358
            logger.info("Removed model data for project {}.".format(self.project_id))
359
        else:
360
            logger.warning(
361
                "No model data to remove for project {}.".format(self.project_id)
362
            )
363