annif.corpus.document.DocumentFileCSV.documents()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 23
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 19
nop 1
dl 0
loc 23
rs 8.5166
c 0
b 0
f 0
1
"""Classes for supporting document corpora"""
2
3
from __future__ import annotations
4
5
import csv
6
import glob
7
import gzip
8
import os.path
9
import re
10
from itertools import islice
11
from typing import TYPE_CHECKING
12
13
import annif.util
14
from annif.exception import OperationFailedException
15
16
from .json import json_file_to_document, json_to_document
17
from .types import Document, DocumentCorpus, SubjectSet
18
19
if TYPE_CHECKING:
20
    from collections.abc import Iterator
21
22
    from annif.vocab import SubjectIndex
23
24
logger = annif.logger
25
26
27
class DocumentDirectory(DocumentCorpus):
28
    """A directory of files as a full text document corpus"""
29
30
    def __init__(
31
        self,
32
        path: str,
33
        subject_index: SubjectIndex | None = None,
34
        language: str | None = None,
35
        require_subjects: bool = False,
36
    ) -> None:
37
        self.path = path
38
        self.subject_index = subject_index
39
        self.language = language
40
        self.require_subjects = require_subjects
41
42
    def __iter__(self) -> Iterator[str]:
43
        """Iterate through the directory, yielding file paths with corpus documents."""
44
45
        # txt files
46
        for filename in sorted(glob.glob(os.path.join(self.path, "*.txt"))):
47
            yield filename
48
49
        # json files
50
        for filename in sorted(glob.glob(os.path.join(self.path, "*.json"))):
51
            yield filename
52
53
    @staticmethod
54
    def _get_subject_filename(filename: str) -> str | None:
55
        tsvfilename = re.sub(r"\.txt$", ".tsv", filename)
56
        if os.path.exists(tsvfilename):
57
            return tsvfilename
58
59
        keyfilename = re.sub(r"\.txt$", ".key", filename)
60
        if os.path.exists(keyfilename):
61
            return keyfilename
62
63
        return None
64
65
    def _read_txt_file(self, filename: str) -> Document | None:
66
        with open(filename, errors="replace", encoding="utf-8-sig") as docfile:
67
            text = docfile.read()
68
        if not self.require_subjects:
69
            return Document(text=text, subject_set=None, file_path=filename)
70
71
        subjfilename = self._get_subject_filename(filename)
72
        if subjfilename is None:
73
            # subjects required but not found, skipping this docfile
74
            return None
75
76
        with open(subjfilename, encoding="utf-8-sig") as subjfile:
77
            subjects = SubjectSet.from_string(
78
                subjfile.read(), self.subject_index, self.language
79
            )
80
        return Document(text=text, subject_set=subjects, file_path=filename)
81
82
    @property
83
    def documents(self) -> Iterator[Document]:
84
        for docfilename in self:
85
            if docfilename.endswith(".txt"):
86
                doc = self._read_txt_file(docfilename)
87
            else:
88
                doc = json_file_to_document(
89
                    docfilename,
90
                    self.subject_index,
91
                    self.language,
92
                    self.require_subjects,
93
                )
94
95
            if doc is not None:
96
                yield doc
97
98
99
class DocumentFileTSV(DocumentCorpus):
100
    """A TSV file as a corpus of documents with subjects"""
101
102
    def __init__(
103
        self, path: str, subject_index: SubjectIndex, require_subjects=True
104
    ) -> None:
105
        self.path = path
106
        self.subject_index = subject_index
107
        self.require_subjects = require_subjects
108
109
    @property
110
    def documents(self) -> Iterator[Document]:
111
        if self.path.endswith(".gz"):
112
            opener = gzip.open
113
        else:
114
            opener = open
115
        with opener(self.path, mode="rt", encoding="utf-8-sig") as tsvfile:
116
            for line in tsvfile:
117
                yield from self._parse_tsv_line(line)
118
119
    def _parse_tsv_line(self, line: str) -> Iterator[Document]:
120
        if "\t" in line:
121
            text, uris = line.split("\t", maxsplit=1)
122
            subject_ids = {
123
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
124
                for uri in uris.split()
125
            }
126
            yield Document(text=text, subject_set=SubjectSet(subject_ids))
127
        else:
128
            if self.require_subjects:
129
                logger.warning(
130
                    'Skipping invalid line (missing tab): "%s"', line.rstrip()
131
                )
132
            else:
133
                yield Document(text=line.strip())
134
135
136
class DocumentFileCSV(DocumentCorpus):
137
    """A CSV file as a corpus of documents with subjects"""
138
139
    def __init__(
140
        self, path: str, subject_index: SubjectIndex, require_subjects=True
141
    ) -> None:
142
        self.path = path
143
        self.subject_index = subject_index
144
        self.require_subjects = require_subjects
145
146
    @property
147
    def documents(self) -> Iterator[Document]:
148
        if self.path.endswith(".gz"):
149
            opener = gzip.open
150
        else:
151
            opener = open
152
        with opener(self.path, mode="rt", encoding="utf-8-sig") as csvfile:
153
            reader = csv.DictReader(csvfile)
154
            if not self._check_fields(reader):
155
                if self.require_subjects:
156
                    raise OperationFailedException(
157
                        f"Cannot parse CSV file {self.path}. "
158
                        + "The file must have a header row that defines at least "
159
                        + "the columns 'text' and 'subject_uris'."
160
                    )
161
                else:
162
                    raise OperationFailedException(
163
                        f"Cannot parse CSV file {self.path}. "
164
                        + "The file must have a header row that defines at least "
165
                        + "the column 'text'."
166
                    )
167
            for row in reader:
168
                yield from self._parse_row(row)
169
170
    def _parse_row(self, row: dict[str, str]) -> Iterator[Document]:
171
        if self.require_subjects or (
172
            self.subject_index is not None and "subject_uris" in row
173
        ):
174
            subject_ids = {
175
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
176
                for uri in (row["subject_uris"] or "").strip().split()
177
            }
178
        else:
179
            subject_ids = set()
180
        metadata = {
181
            key: val
182
            for key, val in row.items()
183
            if key not in ("document_id", "text", "subject_uris")
184
        }
185
        yield Document(
186
            text=(row["text"] or ""),
187
            subject_set=SubjectSet(subject_ids),
188
            metadata=metadata,
189
            document_id=row.get("document_id", None),
190
        )
191
192
    def _check_fields(self, reader: csv.DictReader) -> bool:
193
        fns = reader.fieldnames
194
        if self.require_subjects:
195
            return fns is not None and "text" in fns and "subject_uris" in fns
196
        else:
197
            return fns is not None and "text" in fns
198
199
    @staticmethod
200
    def is_csv_file(path: str) -> bool:
201
        """return True if the path looks like a CSV file"""
202
203
        path_lc = path.lower()
204
        return path_lc.endswith(".csv") or path_lc.endswith(".csv.gz")
205
206
207
class DocumentFileJSONL(DocumentCorpus):
208
    """A JSON Lines file as a corpus of documents with subjects"""
209
210
    def __init__(
211
        self,
212
        path: str,
213
        subject_index: SubjectIndex,
214
        language: str,
215
        require_subjects=True,
216
    ) -> None:
217
        self.path = path
218
        self.subject_index = subject_index
219
        self.language = language
220
        self.require_subjects = require_subjects
221
222
    @property
223
    def documents(self) -> Iterator[Document]:
224
        if self.path.endswith(".gz"):
225
            opener = gzip.open
226
        else:
227
            opener = open
228
        with opener(self.path, mode="rt", encoding="utf-8") as jsonlfile:
229
            for line in jsonlfile:
230
                doc = json_to_document(
231
                    self.path,
232
                    line,
233
                    self.subject_index,
234
                    self.language,
235
                    self.require_subjects,
236
                )
237
                if doc is not None:
238
                    yield doc
239
240
    @staticmethod
241
    def is_jsonl_file(path: str) -> bool:
242
        """return True if the path looks like a JSONL file"""
243
244
        path_lc = path.lower()
245
        return path_lc.endswith(".jsonl") or path_lc.endswith(".jsonl.gz")
246
247
248
class DocumentList(DocumentCorpus):
249
    """A document corpus based on a list of other iterable of Document
250
    objects"""
251
252
    def __init__(self, documents):
253
        self._documents = documents
254
255
    @property
256
    def documents(self):
257
        yield from self._documents
258
259
260
class TransformingDocumentCorpus(DocumentCorpus):
261
    """A document corpus that wraps another document corpus but transforms the
262
    documents using a given transform function"""
263
264
    def __init__(self, corpus, transform_fn):
265
        self._orig_corpus = corpus
266
        self._transform_fn = transform_fn
267
268
    @property
269
    def documents(self):
270
        for doc in self._orig_corpus.documents:
271
            yield self._transform_fn(doc)
272
273
274
class LimitingDocumentCorpus(DocumentCorpus):
275
    """A document corpus that wraps another document corpus but limits the
276
    number of documents to a given limit"""
277
278
    def __init__(self, corpus, docs_limit):
279
        self._orig_corpus = corpus
280
        self.docs_limit = docs_limit
281
282
    @property
283
    def documents(self):
284
        for doc in islice(self._orig_corpus.documents, self.docs_limit):
285
            yield doc
286