Passed
Pull Request — main (#872)
by Osma
04:53 queued 01:25
created

DocumentFileCSV._check_fields()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""Clases for supporting document corpora"""
2
3
from __future__ import annotations
4
5
import csv
6
import glob
7
import gzip
8
import json
9
import os.path
10
import re
11
from itertools import islice
12
from typing import TYPE_CHECKING
13
14
import annif.util
15
from annif.exception import OperationFailedException
16
17
from .types import Document, DocumentCorpus, SubjectSet
18
19
if TYPE_CHECKING:
20
    from collections.abc import Iterator
21
22
    from annif.corpus.subject import SubjectIndex
23
24
logger = annif.logger
25
26
27
class DocumentDirectory(DocumentCorpus):
28
    """A directory of files as a full text document corpus"""
29
30
    def __init__(
31
        self,
32
        path: str,
33
        subject_index: SubjectIndex | None = None,
34
        language: str | None = None,
35
        require_subjects: bool = False,
36
    ) -> None:
37
        self.path = path
38
        self.subject_index = subject_index
39
        self.language = language
40
        self.require_subjects = require_subjects
41
42
    def __iter__(self) -> Iterator[str]:
43
        """Iterate through the directory, yielding file paths with corpus documents."""
44
45
        # txt files
46
        for filename in sorted(glob.glob(os.path.join(self.path, "*.txt"))):
47
            yield filename
48
49
        # json files
50
        for filename in sorted(glob.glob(os.path.join(self.path, "*.json"))):
51
            yield filename
52
53
    @staticmethod
54
    def _get_subject_filename(filename: str) -> str | None:
55
        tsvfilename = re.sub(r"\.txt$", ".tsv", filename)
56
        if os.path.exists(tsvfilename):
57
            return tsvfilename
58
59
        keyfilename = re.sub(r"\.txt$", ".key", filename)
60
        if os.path.exists(keyfilename):
61
            return keyfilename
62
63
        return None
64
65
    def _read_txt_file(self, filename: str) -> Document | None:
66
        with open(filename, errors="replace", encoding="utf-8-sig") as docfile:
67
            text = docfile.read()
68
        if not self.require_subjects:
69
            return Document(text=text, subject_set=None)
70
71
        subjfilename = self._get_subject_filename(filename)
72
        if subjfilename is None:
73
            # subjects required but not found, skipping this docfile
74
            return None
75
76
        with open(subjfilename, encoding="utf-8-sig") as subjfile:
77
            subjects = SubjectSet.from_string(
78
                subjfile.read(), self.subject_index, self.language
79
            )
80
        return Document(text=text, subject_set=subjects)
81
82
    def _read_json_file(self, filename: str) -> Document | None:
83
        with open(filename) as jsonfile:
84
            data = json.load(jsonfile)
85
86
        subjects = SubjectSet(
87
            [
88
                self.subject_index.by_uri(subj["uri"])
89
                for subj in data.get("subjects", [])
90
            ]
91
        )
92
        if self.require_subjects and not subjects:
93
            return None
94
95
        return Document(
96
            text=data.get("text", ""),
97
            metadata=data.get("metadata", {}),
98
            subject_set=subjects,
99
        )
100
101
    @property
102
    def documents(self) -> Iterator[Document]:
103
        for docfilename in self:
104
            if docfilename.endswith(".txt"):
105
                doc = self._read_txt_file(docfilename)
106
            else:
107
                doc = self._read_json_file(docfilename)
108
109
            if doc is not None:
110
                yield doc
111
112
113
class DocumentFileTSV(DocumentCorpus):
114
    """A TSV file as a corpus of documents with subjects"""
115
116
    def __init__(self, path: str, subject_index: SubjectIndex) -> None:
117
        self.path = path
118
        self.subject_index = subject_index
119
120
    @property
121
    def documents(self) -> Iterator[Document]:
122
        if self.path.endswith(".gz"):
123
            opener = gzip.open
124
        else:
125
            opener = open
126
        with opener(self.path, mode="rt", encoding="utf-8-sig") as tsvfile:
127
            for line in tsvfile:
128
                yield from self._parse_tsv_line(line)
129
130
    def _parse_tsv_line(self, line: str) -> Iterator[Document]:
131
        if "\t" in line:
132
            text, uris = line.split("\t", maxsplit=1)
133
            subject_ids = {
134
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
135
                for uri in uris.split()
136
            }
137
            yield Document(text=text, subject_set=SubjectSet(subject_ids))
138
        else:
139
            logger.warning('Skipping invalid line (missing tab): "%s"', line.rstrip())
140
141
142
class DocumentFileCSV(DocumentCorpus):
143
    """A CSV file as a corpus of documents with subjects"""
144
145
    def __init__(self, path: str, subject_index: SubjectIndex) -> None:
146
        self.path = path
147
        self.subject_index = subject_index
148
149
    @property
150
    def documents(self) -> Iterator[Document]:
151
        if self.path.endswith(".gz"):
152
            opener = gzip.open
153
        else:
154
            opener = open
155
        with opener(self.path, mode="rt", encoding="utf-8-sig") as csvfile:
156
            reader = csv.DictReader(csvfile)
157
            if not self._check_fields(reader):
158
                raise OperationFailedException(
159
                    f"Cannot parse CSV file {self.path}. "
160
                    + "The file must have a header row that defines at least "
161
                    + "the columns 'text' and 'subject_uris'."
162
                )
163
            for row in reader:
164
                yield from self._parse_row(row)
165
166
    def _parse_row(self, row: dict[str, str]) -> Iterator[Document]:
167
        subject_ids = {
168
            self.subject_index.by_uri(annif.util.cleanup_uri(uri))
169
            for uri in (row["subject_uris"] or "").strip().split()
170
        }
171
        metadata = {
172
            key: val for key, val in row.items() if key not in ("text", "subject_uris")
173
        }
174
        yield Document(
175
            text=(row["text"] or ""),
176
            subject_set=SubjectSet(subject_ids),
177
            metadata=metadata,
178
        )
179
180
    def _check_fields(self, reader: csv.DictReader) -> bool:
181
        fns = reader.fieldnames
182
        return fns is not None and "text" in fns and "subject_uris" in fns
183
184
    @staticmethod
185
    def is_csv_file(path: str) -> bool:
186
        """return True if the path looks like a CSV file"""
187
188
        path_lc = path.lower()
189
        return path_lc.endswith(".csv") or path_lc.endswith(".csv.gz")
190
191
192
class DocumentList(DocumentCorpus):
193
    """A document corpus based on a list of other iterable of Document
194
    objects"""
195
196
    def __init__(self, documents):
197
        self._documents = documents
198
199
    @property
200
    def documents(self):
201
        yield from self._documents
202
203
204
class TransformingDocumentCorpus(DocumentCorpus):
205
    """A document corpus that wraps another document corpus but transforms the
206
    documents using a given transform function"""
207
208
    def __init__(self, corpus, transform_fn):
209
        self._orig_corpus = corpus
210
        self._transform_fn = transform_fn
211
212
    @property
213
    def documents(self):
214
        for doc in self._orig_corpus.documents:
215
            yield self._transform_fn(doc)
216
217
218
class LimitingDocumentCorpus(DocumentCorpus):
219
    """A document corpus that wraps another document corpus but limits the
220
    number of documents to a given limit"""
221
222
    def __init__(self, corpus, docs_limit):
223
        self._orig_corpus = corpus
224
        self.docs_limit = docs_limit
225
226
    @property
227
    def documents(self):
228
        for doc in islice(self._orig_corpus.documents, self.docs_limit):
229
            yield doc
230