Passed
Pull Request — main (#872)
by Osma
06:14 queued 03:12
created

annif.corpus.document.DocumentList.documents()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""Clases for supporting document corpora"""
2
3
from __future__ import annotations
4
5
import csv
6
import glob
7
import gzip
8
import json
9
import os.path
10
import re
11
from itertools import islice
12
from typing import TYPE_CHECKING
13
14
import annif.util
15
from annif.exception import OperationFailedException
16
17
from .types import Document, DocumentCorpus, SubjectSet
18
19
if TYPE_CHECKING:
20
    from collections.abc import Iterator
21
22
    from annif.corpus.subject import SubjectIndex
23
24
logger = annif.logger
25
26
27
class DocumentDirectory(DocumentCorpus):
28
    """A directory of files as a full text document corpus"""
29
30
    def __init__(
31
        self,
32
        path: str,
33
        subject_index: SubjectIndex | None = None,
34
        language: str | None = None,
35
        require_subjects: bool = False,
36
    ) -> None:
37
        self.path = path
38
        self.subject_index = subject_index
39
        self.language = language
40
        self.require_subjects = require_subjects
41
42
    def __iter__(self) -> Iterator[str]:
43
        """Iterate through the directory, yielding file paths with corpus documents."""
44
45
        # txt files
46
        for filename in sorted(glob.glob(os.path.join(self.path, "*.txt"))):
47
            yield filename
48
49
        # json files
50
        for filename in sorted(glob.glob(os.path.join(self.path, "*.json"))):
51
            yield filename
52
53
    @staticmethod
54
    def _get_subject_filename(filename: str) -> str | None:
55
        tsvfilename = re.sub(r"\.txt$", ".tsv", filename)
56
        if os.path.exists(tsvfilename):
57
            return tsvfilename
58
59
        keyfilename = re.sub(r"\.txt$", ".key", filename)
60
        if os.path.exists(keyfilename):
61
            return keyfilename
62
63
        return None
64
65
    def _read_txt_file(self, filename: str) -> Document | None:
66
        with open(filename, errors="replace", encoding="utf-8-sig") as docfile:
67
            text = docfile.read()
68
        if not self.require_subjects:
69
            return Document(text=text, subject_set=None)
70
71
        subjfilename = self._get_subject_filename(filename)
72
        if subjfilename is None:
73
            # subjects required but not found, skipping this docfile
74
            return None
75
76
        with open(subjfilename, encoding="utf-8-sig") as subjfile:
77
            subjects = SubjectSet.from_string(
78
                subjfile.read(), self.subject_index, self.language
79
            )
80
        return Document(text=text, subject_set=subjects)
81
82
    def _read_json_file(self, filename: str) -> Document | None:
83
        if os.path.getsize(filename) == 0:
84
            logger.warning(f"Skipping empty file {filename}")
85
            return None
86
87
        with open(filename) as jsonfile:
88
            try:
89
                data = json.load(jsonfile)
90
            except json.JSONDecodeError as err:
91
                logger.warning(f"JSON parsing failed for file {filename}: {err}")
92
                return None
93
94
        subjects = SubjectSet(
95
            [
96
                self.subject_index.by_uri(subj["uri"])
97
                for subj in data.get("subjects", [])
98
            ]
99
        )
100
        if self.require_subjects and not subjects:
101
            return None
102
103
        return Document(
104
            text=data.get("text", ""),
105
            metadata=data.get("metadata", {}),
106
            subject_set=subjects,
107
        )
108
109
    @property
110
    def documents(self) -> Iterator[Document]:
111
        for docfilename in self:
112
            if docfilename.endswith(".txt"):
113
                doc = self._read_txt_file(docfilename)
114
            else:
115
                doc = self._read_json_file(docfilename)
116
117
            if doc is not None:
118
                yield doc
119
120
121
class DocumentFileTSV(DocumentCorpus):
122
    """A TSV file as a corpus of documents with subjects"""
123
124
    def __init__(self, path: str, subject_index: SubjectIndex) -> None:
125
        self.path = path
126
        self.subject_index = subject_index
127
128
    @property
129
    def documents(self) -> Iterator[Document]:
130
        if self.path.endswith(".gz"):
131
            opener = gzip.open
132
        else:
133
            opener = open
134
        with opener(self.path, mode="rt", encoding="utf-8-sig") as tsvfile:
135
            for line in tsvfile:
136
                yield from self._parse_tsv_line(line)
137
138
    def _parse_tsv_line(self, line: str) -> Iterator[Document]:
139
        if "\t" in line:
140
            text, uris = line.split("\t", maxsplit=1)
141
            subject_ids = {
142
                self.subject_index.by_uri(annif.util.cleanup_uri(uri))
143
                for uri in uris.split()
144
            }
145
            yield Document(text=text, subject_set=SubjectSet(subject_ids))
146
        else:
147
            logger.warning('Skipping invalid line (missing tab): "%s"', line.rstrip())
148
149
150
class DocumentFileCSV(DocumentCorpus):
151
    """A CSV file as a corpus of documents with subjects"""
152
153
    def __init__(self, path: str, subject_index: SubjectIndex) -> None:
154
        self.path = path
155
        self.subject_index = subject_index
156
157
    @property
158
    def documents(self) -> Iterator[Document]:
159
        if self.path.endswith(".gz"):
160
            opener = gzip.open
161
        else:
162
            opener = open
163
        with opener(self.path, mode="rt", encoding="utf-8-sig") as csvfile:
164
            reader = csv.DictReader(csvfile)
165
            if not self._check_fields(reader):
166
                raise OperationFailedException(
167
                    f"Cannot parse CSV file {self.path}. "
168
                    + "The file must have a header row that defines at least "
169
                    + "the columns 'text' and 'subject_uris'."
170
                )
171
            for row in reader:
172
                yield from self._parse_row(row)
173
174
    def _parse_row(self, row: dict[str, str]) -> Iterator[Document]:
175
        subject_ids = {
176
            self.subject_index.by_uri(annif.util.cleanup_uri(uri))
177
            for uri in (row["subject_uris"] or "").strip().split()
178
        }
179
        metadata = {
180
            key: val for key, val in row.items() if key not in ("text", "subject_uris")
181
        }
182
        yield Document(
183
            text=(row["text"] or ""),
184
            subject_set=SubjectSet(subject_ids),
185
            metadata=metadata,
186
        )
187
188
    def _check_fields(self, reader: csv.DictReader) -> bool:
189
        fns = reader.fieldnames
190
        return fns is not None and "text" in fns and "subject_uris" in fns
191
192
    @staticmethod
193
    def is_csv_file(path: str) -> bool:
194
        """return True if the path looks like a CSV file"""
195
196
        path_lc = path.lower()
197
        return path_lc.endswith(".csv") or path_lc.endswith(".csv.gz")
198
199
200
class DocumentList(DocumentCorpus):
201
    """A document corpus based on a list of other iterable of Document
202
    objects"""
203
204
    def __init__(self, documents):
205
        self._documents = documents
206
207
    @property
208
    def documents(self):
209
        yield from self._documents
210
211
212
class TransformingDocumentCorpus(DocumentCorpus):
213
    """A document corpus that wraps another document corpus but transforms the
214
    documents using a given transform function"""
215
216
    def __init__(self, corpus, transform_fn):
217
        self._orig_corpus = corpus
218
        self._transform_fn = transform_fn
219
220
    @property
221
    def documents(self):
222
        for doc in self._orig_corpus.documents:
223
            yield self._transform_fn(doc)
224
225
226
class LimitingDocumentCorpus(DocumentCorpus):
227
    """A document corpus that wraps another document corpus but limits the
228
    number of documents to a given limit"""
229
230
    def __init__(self, corpus, docs_limit):
231
        self._orig_corpus = corpus
232
        self.docs_limit = docs_limit
233
234
    @property
235
    def documents(self):
236
        for doc in islice(self._orig_corpus.documents, self.docs_limit):
237
            yield doc
238