annif.corpus.document.DocumentFileTSV.documents()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 8
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
"""Classes for supporting document corpora"""
2
3
from __future__ import annotations
4
5
import csv
6
import glob
7
import gzip
8
import os.path
9
import re
10
from itertools import islice
11
from typing import TYPE_CHECKING
12
13
import annif.util
14
from annif.exception import OperationFailedException
15
16
from .json import json_file_to_document, json_to_document
17
from .types import Document, DocumentCorpus, SubjectSet
18
19
if TYPE_CHECKING:
20
    from collections.abc import Iterator
21
22
    from annif.vocab import SubjectIndex
23
24
logger = annif.logger
25
26
27
class DocumentDirectory(DocumentCorpus):
28
    """A directory of files as a full text document corpus"""
29
30
    def __init__(
31
        self,
32
        path: str,
33
        subject_index: SubjectIndex | None = None,
34
        language: str | None = None,
35
        require_subjects: bool = False,
36
    ) -> None:
37
        self.path = path
38
        self.subject_index = subject_index
39
        self.language = language
40
        self.require_subjects = require_subjects
41
42
    def __iter__(self) -> Iterator[str]:
43
        """Iterate through the directory, yielding file paths with corpus documents."""
44
45
        # txt files
46
        for filename in sorted(glob.glob(os.path.join(self.path, "*.txt"))):
47
            yield filename
48
49
        # json files
50
        for filename in sorted(glob.glob(os.path.join(self.path, "*.json"))):
51
            yield filename
52
53
    @staticmethod
54
    def _get_subject_filename(filename: str) -> str | None:
55
        tsvfilename = re.sub(r"\.txt$", ".tsv", filename)
56
        if os.path.exists(tsvfilename):
57
            return tsvfilename
58
59
        keyfilename = re.sub(r"\.txt$", ".key", filename)
60
        if os.path.exists(keyfilename):
61
            return keyfilename
62
63
        return None
64
65
    def _read_txt_file(self, filename: str) -> Document | None:
66
        with open(filename, errors="replace", encoding="utf-8-sig") as docfile:
67
            text = docfile.read()
68
        if not self.require_subjects:
69
            return Document(text=text, subject_set=None, file_path=filename)
70
71
        subjfilename = self._get_subject_filename(filename)
72
        if subjfilename is None:
73
            # subjects required but not found, skipping this docfile
74
            return None
75
76
        with open(subjfilename, encoding="utf-8-sig") as subjfile:
77
            subjects = SubjectSet.from_string(
78
                subjfile.read(), self.subject_index, self.language
79
            )
80
        return Document(text=text, subject_set=subjects, file_path=filename)
81
82
    @property
83
    def documents(self) -> Iterator[Document]:
84
        for docfilename in self:
85
            if docfilename.endswith(".txt"):
86
                doc = self._read_txt_file(docfilename)
87
            else:
88
                doc = json_file_to_document(
89
                    docfilename,
90
                    self.subject_index,
91
                    self.language,
92
                    self.require_subjects,
93
                )
94
95
            if doc is not None:
96
                yield doc
97
98
99
class DocumentFileTSV(DocumentCorpus):
100
    """A TSV file as a corpus of documents with subjects"""
101
102
    def __init__(
103
        self, path: str, subject_index: SubjectIndex, require_subjects=True
104
    ) -> None:
105
        self.path = path
106
        self.subject_index = subject_index
107
        self.require_subjects = require_subjects
108
109
    @property
110
    def documents(self) -> Iterator[Document]:
111
        if self.path.endswith(".gz"):
112
            opener = gzip.open
113
        else:
114
            opener = open
115
        with opener(self.path, mode="rt", encoding="utf-8-sig") as tsvfile:
116
            for line in tsvfile:
117
                yield from self._parse_tsv_line(line)
118
119
    def _parse_tsv_line(self, line: str) -> Iterator[Document]:
120
        if "\t" in line:
121
            text, uris = line.split("\t", maxsplit=1)
122
            subject_ids = {
123
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
124
                for uri in uris.split()
125
            }
126
            yield Document(text=text, subject_set=SubjectSet(subject_ids))
127
        else:
128
            if self.require_subjects:
129
                logger.warning(
130
                    'Skipping invalid line (missing tab): "%s"', line.rstrip()
131
                )
132
            else:
133
                yield Document(text=line.strip())
134
135
136
class DocumentFileCSV(DocumentCorpus):
137
    """A CSV file as a corpus of documents with subjects"""
138
139
    def __init__(
140
        self, path: str, subject_index: SubjectIndex, require_subjects=True
141
    ) -> None:
142
        self.path = path
143
        self.subject_index = subject_index
144
        self.require_subjects = require_subjects
145
146
    @property
147
    def documents(self) -> Iterator[Document]:
148
        if self.path.endswith(".gz"):
149
            opener = gzip.open
150
        else:
151
            opener = open
152
        with opener(self.path, mode="rt", encoding="utf-8-sig") as csvfile:
153
            reader = csv.DictReader(csvfile)
154
            if not self._check_fields(reader):
155
                if self.require_subjects:
156
                    raise OperationFailedException(
157
                        f"Cannot parse CSV file {self.path}. "
158
                        + "The file must have a header row that defines at least "
159
                        + "the columns 'text' and 'subject_uris'."
160
                    )
161
                else:
162
                    raise OperationFailedException(
163
                        f"Cannot parse CSV file {self.path}. "
164
                        + "The file must have a header row that defines at least "
165
                        + "the column 'text'."
166
                    )
167
            for row in reader:
168
                yield from self._parse_row(row)
169
170
    def _parse_row(self, row: dict[str, str]) -> Iterator[Document]:
171
        if self.require_subjects or (
172
            self.subject_index is not None and "subject_uris" in row
173
        ):
174
            subject_ids = {
175
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
176
                for uri in (row["subject_uris"] or "").strip().split()
177
            }
178
        else:
179
            subject_ids = set()
180
        metadata = {
181
            key: val for key, val in row.items() if key not in ("text", "subject_uris")
182
        }
183
        yield Document(
184
            text=(row["text"] or ""),
185
            subject_set=SubjectSet(subject_ids),
186
            metadata=metadata,
187
        )
188
189
    def _check_fields(self, reader: csv.DictReader) -> bool:
190
        fns = reader.fieldnames
191
        if self.require_subjects:
192
            return fns is not None and "text" in fns and "subject_uris" in fns
193
        else:
194
            return fns is not None and "text" in fns
195
196
    @staticmethod
197
    def is_csv_file(path: str) -> bool:
198
        """return True if the path looks like a CSV file"""
199
200
        path_lc = path.lower()
201
        return path_lc.endswith(".csv") or path_lc.endswith(".csv.gz")
202
203
204
class DocumentFileJSONL(DocumentCorpus):
205
    """A JSON Lines file as a corpus of documents with subjects"""
206
207
    def __init__(
208
        self,
209
        path: str,
210
        subject_index: SubjectIndex,
211
        language: str,
212
        require_subjects=True,
213
    ) -> None:
214
        self.path = path
215
        self.subject_index = subject_index
216
        self.language = language
217
        self.require_subjects = require_subjects
218
219
    @property
220
    def documents(self) -> Iterator[Document]:
221
        if self.path.endswith(".gz"):
222
            opener = gzip.open
223
        else:
224
            opener = open
225
        with opener(self.path, mode="rt", encoding="utf-8") as jsonlfile:
226
            for line in jsonlfile:
227
                doc = json_to_document(
228
                    self.path,
229
                    line,
230
                    self.subject_index,
231
                    self.language,
232
                    self.require_subjects,
233
                )
234
                if doc is not None:
235
                    yield doc
236
237
    @staticmethod
238
    def is_jsonl_file(path: str) -> bool:
239
        """return True if the path looks like a JSONL file"""
240
241
        path_lc = path.lower()
242
        return path_lc.endswith(".jsonl") or path_lc.endswith(".jsonl.gz")
243
244
245
class DocumentList(DocumentCorpus):
246
    """A document corpus based on a list of other iterable of Document
247
    objects"""
248
249
    def __init__(self, documents):
250
        self._documents = documents
251
252
    @property
253
    def documents(self):
254
        yield from self._documents
255
256
257
class TransformingDocumentCorpus(DocumentCorpus):
258
    """A document corpus that wraps another document corpus but transforms the
259
    documents using a given transform function"""
260
261
    def __init__(self, corpus, transform_fn):
262
        self._orig_corpus = corpus
263
        self._transform_fn = transform_fn
264
265
    @property
266
    def documents(self):
267
        for doc in self._orig_corpus.documents:
268
            yield self._transform_fn(doc)
269
270
271
class LimitingDocumentCorpus(DocumentCorpus):
272
    """A document corpus that wraps another document corpus but limits the
273
    number of documents to a given limit"""
274
275
    def __init__(self, corpus, docs_limit):
276
        self._orig_corpus = corpus
277
        self.docs_limit = docs_limit
278
279
    @property
280
    def documents(self):
281
        for doc in islice(self._orig_corpus.documents, self.docs_limit):
282
            yield doc
283