annif.corpus.document.DocumentFileCSV._parse_row()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 16
nop 2
dl 0
loc 20
rs 9.6
c 0
b 0
f 0
1
"""Classes for supporting document corpora"""
2
3
from __future__ import annotations
4
5
import csv
6
import glob
7
import gzip
8
import os.path
9
import re
10
from itertools import islice
11
from typing import TYPE_CHECKING
12
13
import annif.util
14
from annif.exception import OperationFailedException
15
16
from .json import json_file_to_document, json_to_document
17
from .types import Document, DocumentCorpus, SubjectSet
18
19
if TYPE_CHECKING:
20
    from collections.abc import Iterator
21
22
    from annif.vocab import SubjectIndex
23
24
logger = annif.logger
25
26
csv.field_size_limit(2147483647)  # Increase field size limit to 2 GB
27
28
29
class DocumentDirectory(DocumentCorpus):
30
    """A directory of files as a full text document corpus"""
31
32
    def __init__(
33
        self,
34
        path: str,
35
        subject_index: SubjectIndex | None = None,
36
        language: str | None = None,
37
        require_subjects: bool = False,
38
    ) -> None:
39
        self.path = path
40
        self.subject_index = subject_index
41
        self.language = language
42
        self.require_subjects = require_subjects
43
44
    def __iter__(self) -> Iterator[str]:
45
        """Iterate through the directory, yielding file paths with corpus documents."""
46
47
        # txt files
48
        for filename in sorted(glob.glob(os.path.join(self.path, "*.txt"))):
49
            yield filename
50
51
        # json files
52
        for filename in sorted(glob.glob(os.path.join(self.path, "*.json"))):
53
            yield filename
54
55
    @staticmethod
56
    def _get_subject_filename(filename: str) -> str | None:
57
        tsvfilename = re.sub(r"\.txt$", ".tsv", filename)
58
        if os.path.exists(tsvfilename):
59
            return tsvfilename
60
61
        keyfilename = re.sub(r"\.txt$", ".key", filename)
62
        if os.path.exists(keyfilename):
63
            return keyfilename
64
65
        return None
66
67
    def _read_txt_file(self, filename: str) -> Document | None:
68
        with open(filename, errors="replace", encoding="utf-8-sig") as docfile:
69
            text = docfile.read()
70
        if not self.require_subjects:
71
            return Document(text=text, subject_set=None, file_path=filename)
72
73
        subjfilename = self._get_subject_filename(filename)
74
        if subjfilename is None:
75
            # subjects required but not found, skipping this docfile
76
            return None
77
78
        with open(subjfilename, encoding="utf-8-sig") as subjfile:
79
            subjects = SubjectSet.from_string(
80
                subjfile.read(), self.subject_index, self.language
81
            )
82
        return Document(text=text, subject_set=subjects, file_path=filename)
83
84
    @property
85
    def documents(self) -> Iterator[Document]:
86
        for docfilename in self:
87
            if docfilename.endswith(".txt"):
88
                doc = self._read_txt_file(docfilename)
89
            else:
90
                doc = json_file_to_document(
91
                    docfilename,
92
                    self.subject_index,
93
                    self.language,
94
                    self.require_subjects,
95
                )
96
97
            if doc is not None:
98
                yield doc
99
100
101
class DocumentFileTSV(DocumentCorpus):
102
    """A TSV file as a corpus of documents with subjects"""
103
104
    def __init__(
105
        self, path: str, subject_index: SubjectIndex, require_subjects=True
106
    ) -> None:
107
        self.path = path
108
        self.subject_index = subject_index
109
        self.require_subjects = require_subjects
110
111
    @property
112
    def documents(self) -> Iterator[Document]:
113
        if self.path.endswith(".gz"):
114
            opener = gzip.open
115
        else:
116
            opener = open
117
        with opener(self.path, mode="rt", encoding="utf-8-sig") as tsvfile:
118
            for line in tsvfile:
119
                yield from self._parse_tsv_line(line)
120
121
    def _parse_tsv_line(self, line: str) -> Iterator[Document]:
122
        if "\t" in line:
123
            text, uris = line.split("\t", maxsplit=1)
124
            subject_ids = {
125
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
126
                for uri in uris.split()
127
            }
128
            yield Document(text=text, subject_set=SubjectSet(subject_ids))
129
        else:
130
            if self.require_subjects:
131
                logger.warning(
132
                    'Skipping invalid line (missing tab): "%s"', line.rstrip()
133
                )
134
            else:
135
                yield Document(text=line.strip())
136
137
138
class DocumentFileCSV(DocumentCorpus):
139
    """A CSV file as a corpus of documents with subjects"""
140
141
    def __init__(
142
        self, path: str, subject_index: SubjectIndex, require_subjects=True
143
    ) -> None:
144
        self.path = path
145
        self.subject_index = subject_index
146
        self.require_subjects = require_subjects
147
148
    @property
149
    def documents(self) -> Iterator[Document]:
150
        if self.path.endswith(".gz"):
151
            opener = gzip.open
152
        else:
153
            opener = open
154
        with opener(self.path, mode="rt", encoding="utf-8-sig") as csvfile:
155
            reader = csv.DictReader(csvfile)
156
            if not self._check_fields(reader):
157
                if self.require_subjects:
158
                    raise OperationFailedException(
159
                        f"Cannot parse CSV file {self.path}. "
160
                        + "The file must have a header row that defines at least "
161
                        + "the columns 'text' and 'subject_uris'."
162
                    )
163
                else:
164
                    raise OperationFailedException(
165
                        f"Cannot parse CSV file {self.path}. "
166
                        + "The file must have a header row that defines at least "
167
                        + "the column 'text'."
168
                    )
169
            for row in reader:
170
                yield from self._parse_row(row)
171
172
    def _parse_row(self, row: dict[str, str]) -> Iterator[Document]:
173
        if self.require_subjects or (
174
            self.subject_index is not None and "subject_uris" in row
175
        ):
176
            subject_ids = {
177
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
178
                for uri in (row["subject_uris"] or "").strip().split()
179
            }
180
        else:
181
            subject_ids = set()
182
        metadata = {
183
            key: val
184
            for key, val in row.items()
185
            if key not in ("document_id", "text", "subject_uris")
186
        }
187
        yield Document(
188
            text=(row["text"] or ""),
189
            subject_set=SubjectSet(subject_ids),
190
            metadata=metadata,
191
            document_id=row.get("document_id", None),
192
        )
193
194
    def _check_fields(self, reader: csv.DictReader) -> bool:
195
        fns = reader.fieldnames
196
        if self.require_subjects:
197
            return fns is not None and "text" in fns and "subject_uris" in fns
198
        else:
199
            return fns is not None and "text" in fns
200
201
    @staticmethod
202
    def is_csv_file(path: str) -> bool:
203
        """return True if the path looks like a CSV file"""
204
205
        path_lc = path.lower()
206
        return path_lc.endswith(".csv") or path_lc.endswith(".csv.gz")
207
208
209
class DocumentFileJSONL(DocumentCorpus):
210
    """A JSON Lines file as a corpus of documents with subjects"""
211
212
    def __init__(
213
        self,
214
        path: str,
215
        subject_index: SubjectIndex,
216
        language: str,
217
        require_subjects=True,
218
    ) -> None:
219
        self.path = path
220
        self.subject_index = subject_index
221
        self.language = language
222
        self.require_subjects = require_subjects
223
224
    @property
225
    def documents(self) -> Iterator[Document]:
226
        if self.path.endswith(".gz"):
227
            opener = gzip.open
228
        else:
229
            opener = open
230
        with opener(self.path, mode="rt", encoding="utf-8") as jsonlfile:
231
            for line in jsonlfile:
232
                doc = json_to_document(
233
                    self.path,
234
                    line,
235
                    self.subject_index,
236
                    self.language,
237
                    self.require_subjects,
238
                )
239
                if doc is not None:
240
                    yield doc
241
242
    @staticmethod
243
    def is_jsonl_file(path: str) -> bool:
244
        """return True if the path looks like a JSONL file"""
245
246
        path_lc = path.lower()
247
        return path_lc.endswith(".jsonl") or path_lc.endswith(".jsonl.gz")
248
249
250
class DocumentList(DocumentCorpus):
251
    """A document corpus based on a list of other iterable of Document
252
    objects"""
253
254
    def __init__(self, documents):
255
        self._documents = documents
256
257
    @property
258
    def documents(self):
259
        yield from self._documents
260
261
262
class TransformingDocumentCorpus(DocumentCorpus):
263
    """A document corpus that wraps another document corpus but transforms the
264
    documents using a given transform function"""
265
266
    def __init__(self, corpus, transform_fn):
267
        self._orig_corpus = corpus
268
        self._transform_fn = transform_fn
269
270
    @property
271
    def documents(self):
272
        for doc in self._orig_corpus.documents:
273
            yield self._transform_fn(doc)
274
275
276
class LimitingDocumentCorpus(DocumentCorpus):
277
    """A document corpus that wraps another document corpus but limits the
278
    number of documents to a given limit"""
279
280
    def __init__(self, corpus, docs_limit):
281
        self._orig_corpus = corpus
282
        self.docs_limit = docs_limit
283
284
    @property
285
    def documents(self):
286
        for doc in islice(self._orig_corpus.documents, self.docs_limit):
287
            yield doc
288