annif.backend.tfidf.SubjectBuffer.write()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
"""Backend that returns most similar subjects based on similarity in sparse
2
TF-IDF normalized bag-of-words vector space"""
3
4
from __future__ import annotations
5
6
import os.path
7
import tempfile
8
from typing import TYPE_CHECKING, Any
9
10
import gensim.similarities
11
from gensim.matutils import Sparse2Corpus
12
13
import annif.util
14
from annif.exception import NotInitializedException, NotSupportedException
15
from annif.suggestion import vector_to_suggestions
16
17
from . import backend, mixins
18
19
if TYPE_CHECKING:
20
    from collections.abc import Iterator
21
22
    from scipy.sparse._csr import csr_matrix
23
24
    from annif.corpus import Document, DocumentCorpus
25
26
27
class SubjectBuffer:
28
    """A file-backed buffer to store and retrieve subject text."""
29
30
    BUFFER_SIZE = 100
31
32
    def __init__(self, tempdir: str, subject_id: int) -> None:
33
        filename = "{:08d}.txt".format(subject_id)
34
        self._path = os.path.join(tempdir, filename)
35
        self._buffer = []
36
        self._created = False
37
38
    def flush(self) -> None:
39
        if self._created:
40
            mode = "a"
41
        else:
42
            mode = "w"
43
44
        with open(self._path, mode, encoding="utf-8") as subjfile:
45
            for text in self._buffer:
46
                print(text, file=subjfile)
47
48
        self._buffer = []
49
        self._created = True
50
51
    def write(self, text: str) -> None:
52
        self._buffer.append(text)
53
        if len(self._buffer) >= self.BUFFER_SIZE:
54
            self.flush()
55
56
    def read(self) -> str:
57
        if not self._created:
58
            # file was never created - we can simply return the buffer content
59
            return "\n".join(self._buffer)
60
        else:
61
            with open(self._path, "r", encoding="utf-8") as subjfile:
62
                return subjfile.read() + "\n" + "\n".join(self._buffer)
63
64
65
class TFIDFBackend(mixins.TfidfVectorizerMixin, backend.AnnifBackend):
66
    """TF-IDF vector space similarity based backend for Annif"""
67
68
    name = "tfidf"
69
70
    # defaults for uninitialized instances
71
    _index = None
72
73
    INDEX_FILE = "tfidf-index"
74
75
    def _generate_subjects_from_documents(
76
        self, corpus: DocumentCorpus
77
    ) -> Iterator[str]:
78
        with tempfile.TemporaryDirectory() as tempdir:
79
            subject_buffer = {}
80
            for subject_id in range(len(self.project.subjects)):
81
                subject_buffer[subject_id] = SubjectBuffer(tempdir, subject_id)
82
83
            for doc in corpus.documents:
84
                tokens = self.project.analyzer.tokenize_words(doc.text)
85
                for subject_id in doc.subject_set:
86
                    subject_buffer[subject_id].write(" ".join(tokens))
87
88
            for sid in range(len(self.project.subjects)):
89
                yield subject_buffer[sid].read()
90
91
    def _initialize_index(self) -> None:
92
        if self._index is None:
93
            path = os.path.join(self.datadir, self.INDEX_FILE)
94
            self.debug("loading similarity index from {}".format(path))
95
            if os.path.exists(path):
96
                self._index = gensim.similarities.SparseMatrixSimilarity.load(path)
97
            else:
98
                raise NotInitializedException(
99
                    "similarity index {} not found".format(path),
100
                    backend_id=self.backend_id,
101
                )
102
103
    def initialize(self, parallel: bool = False) -> None:
104
        self.initialize_vectorizer()
105
        self._initialize_index()
106
107
    def _create_index(self, veccorpus: csr_matrix) -> None:
108
        self.info("creating similarity index")
109
        gscorpus = Sparse2Corpus(veccorpus, documents_columns=False)
110
        self._index = gensim.similarities.SparseMatrixSimilarity(
111
            gscorpus, num_features=len(self.vectorizer.vocabulary_)
112
        )
113
        annif.util.atomic_save(self._index, self.datadir, self.INDEX_FILE)
114
115
    def _train(
116
        self,
117
        corpus: DocumentCorpus,
118
        params: dict[str, Any],
119
        jobs: int = 0,
120
    ) -> None:
121
        if corpus == "cached":
122
            raise NotSupportedException(
123
                "Training tfidf project from cached data not supported."
124
            )
125
        if corpus.is_empty():
126
            raise NotSupportedException("Cannot train tfidf project with no documents")
127
        self.info("transforming subject corpus")
128
        subjects = self._generate_subjects_from_documents(corpus)
129
        veccorpus = self.create_vectorizer(subjects)
130
        self._create_index(veccorpus)
131
132
    def _suggest(self, doc: Document, params: dict[str, Any]) -> Iterator:
133
        self.debug(
134
            'Suggesting subjects for text "{}..." (len={})'.format(
135
                doc.text[:20], len(doc.text)
136
            )
137
        )
138
        tokens = self.project.analyzer.tokenize_words(doc.text)
139
        vectors = self.vectorizer.transform([" ".join(tokens)])
140
        return vector_to_suggestions(self._index[vectors[0]], int(params["limit"]))
141