Passed
Push — issue678-refactor-suggestionre... ( 3a8eec...d250de )
by Osma
04:34 queued 01:40
created

annif.suggestion.ListSuggestionResult.as_list()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""Representing suggested subjects."""
2
3
import abc
4
import collections
5
import itertools
6
7
import numpy as np
8
from scipy.sparse import dok_array
9
10
SubjectSuggestion = collections.namedtuple("SubjectSuggestion", "subject_id score")
11
WeightedSuggestionsBatch = collections.namedtuple(
12
    "WeightedSuggestionsBatch", "hit_sets weight subjects"
13
)
14
15
16
class SuggestionResult(metaclass=abc.ABCMeta):
17
    """Abstract base class for a set of hits returned by an analysis
18
    operation."""
19
20
    @abc.abstractmethod
21
    def as_list(self):
22
        """Return the hits as an ordered sequence of SubjectSuggestion objects,
23
        highest scores first."""
24
        pass  # pragma: no cover
25
26
    @abc.abstractmethod
27
    def as_vector(self, size, destination=None):
28
        """Return the hits as a one-dimensional score vector of given size.
29
        If destination array is given (not None) it will be used, otherwise a
30
        new array will be created."""
31
        pass  # pragma: no cover
32
33
    @abc.abstractmethod
34
    def filter(self, subject_index, limit=None, threshold=0.0):
35
        """Return a subset of the hits, filtered by the given limit and
36
        score threshold, as another SuggestionResult object."""
37
        pass  # pragma: no cover
38
39
    @abc.abstractmethod
40
    def __len__(self):
41
        """Return the number of hits with non-zero scores."""
42
        pass  # pragma: no cover
43
44
45
class VectorSuggestionResult(SuggestionResult):
46
    """SuggestionResult implementation based primarily on NumPy vectors."""
47
48
    def __init__(self, vector):
49
        vector_f32 = vector.astype(np.float32)
50
        # limit scores to the range 0.0 .. 1.0
51
        self._vector = np.minimum(np.maximum(vector_f32, 0.0), 1.0)
52
        self._subject_order = None
53
        self._lsr = None
54
55
    def _vector_to_list_suggestion(self):
56
        hits = []
57
        for subject_id in self.subject_order:
58
            score = self._vector[subject_id]
59
            if score <= 0.0:
60
                break  # we can skip the remaining ones
61
            hits.append(SubjectSuggestion(subject_id=subject_id, score=float(score)))
62
        return ListSuggestionResult(hits)
63
64
    @property
65
    def subject_order(self):
66
        if self._subject_order is None:
67
            self._subject_order = np.argsort(self._vector)[::-1]
68
        return self._subject_order
69
70
    def as_list(self):
71
        if self._lsr is None:
72
            self._lsr = self._vector_to_list_suggestion()
73
        return self._lsr.as_list()
74
75
    def as_vector(self, size, destination=None):
76
        if destination is not None:
77
            np.copyto(destination, self._vector)
78
            return destination
79
        return self._vector
80
81
    def filter(self, subject_index, limit=None, threshold=0.0):
82
        mask = self._vector > threshold
83
        deprecated_ids = subject_index.deprecated_ids()
84
        if limit is not None:
85
            limit_mask = np.zeros_like(self._vector, dtype=bool)
86
            deprecated_set = set(deprecated_ids)
87
            top_k_subjects = itertools.islice(
88
                (subj for subj in self.subject_order if subj not in deprecated_set),
89
                limit,
90
            )
91
            limit_mask[list(top_k_subjects)] = True
92
            mask = mask & limit_mask
93
        else:
94
            deprecated_mask = np.ones_like(self._vector, dtype=bool)
95
            deprecated_mask[deprecated_ids] = False
96
            mask = mask & deprecated_mask
97
        vsr = VectorSuggestionResult(self._vector * mask)
98
        return ListSuggestionResult(vsr.as_list())
99
100
    def __len__(self):
101
        return (self._vector > 0.0).sum()
102
103
104
class ListSuggestionResult(SuggestionResult):
105
    """SuggestionResult implementation based primarily on lists of hits."""
106
107
    def __init__(self, hits):
108
        self._list = [self._enforce_score_range(hit) for hit in hits if hit.score > 0.0]
109
        self._vector = None
110
111
    @staticmethod
112
    def _enforce_score_range(hit):
113
        if hit.score > 1.0:
114
            return hit._replace(score=1.0)
115
        return hit
116
117
    def _list_to_vector(self, size, destination):
118
        if destination is None:
119
            destination = np.zeros(size, dtype=np.float32)
120
121
        for hit in self._list:
122
            if hit.subject_id is not None:
123
                destination[hit.subject_id] = hit.score
124
        return destination
125
126
    def as_list(self):
127
        return self._list
128
129
    def as_vector(self, size, destination=None):
130
        if self._vector is None:
131
            self._vector = self._list_to_vector(size, destination)
132
        return self._vector
133
134
    def filter(self, subject_index, limit=None, threshold=0.0):
135
        hits = sorted(self._list, key=lambda hit: hit.score, reverse=True)
136
        filtered_hits = [
137
            hit
138
            for hit in hits
139
            if hit.score >= threshold and hit.score > 0.0 and hit.subject_id is not None
140
        ]
141
        if limit is not None:
142
            filtered_hits = filtered_hits[:limit]
143
        return ListSuggestionResult(filtered_hits)
144
145
    def __len__(self):
146
        return len(self._list)
147
148
149
class SparseSuggestionResult(SuggestionResult):
150
    """SuggestionResult implementation backed by a single row of a sparse array."""
151
152
    def __init__(self, array, idx):
153
        self._array = array
154
        self._idx = idx
155
156
    def as_list(self):
157
        _, cols = self._array[[self._idx], :].nonzero()
158
        suggestions = [
159
            SubjectSuggestion(subject_id=col, score=float(self._array[self._idx, col]))
160
            for col in cols
161
        ]
162
        return sorted(
163
            suggestions, key=lambda suggestion: suggestion.score, reverse=True
164
        )
165
166
    def as_vector(self, size, destination=None):
167
        if destination is not None:
168
            print("as_vector called with destination not None")
169
            return None
170
        return self._array[[self._idx], :].toarray()[0]
171
172
    def filter(self, subject_index, limit=None, threshold=0.0):
173
        lsr = ListSuggestionResult(self.as_list())
174
        return lsr.filter(subject_index, limit, threshold)
175
176
    def __len__(self):
177
        _, cols = self._array[[self._idx], :].nonzero()
178
        return len(cols)
179
180
181
class SuggestionBatch:
182
    """Subject suggestions for a batch of documents."""
183
184
    def __init__(self, array):
185
        """Create a new SuggestionBatch from a csr_array"""
186
        self.array = array
187
188
    @classmethod
189
    def from_sequence(cls, suggestion_results, vocab_size):
190
        """Create a new SuggestionBatch from a sequence of SuggestionResult objects."""
191
192
        # create a dok_array for fast construction
193
        ar = dok_array((len(suggestion_results), vocab_size), dtype=np.float32)
194
        for idx, result in enumerate(suggestion_results):
195
            for suggestion in result.as_list():
196
                ar[idx, suggestion.subject_id] = suggestion.score
197
        return cls(ar.tocsr())
198
199
    def filter(self, limit=None, threshold=0.0):
200
        """Return a subset of the hits, filtered by the given limit and
201
        score threshold, as another SuggestionBatch object."""
202
203
        from annif.util import filter_suggestion
204
205
        return SuggestionBatch(filter_suggestion(self.array, limit, threshold))
206
207
    def __getitem__(self, idx):
208
        if idx < 0 or idx >= len(self):
209
            raise IndexError
210
        return SparseSuggestionResult(self.array, idx)
211
212
    def __len__(self):
213
        return self.array.shape[0]
214
215
216
class SuggestionResults:
217
    """Subject suggestions for a potentially very large number of documents."""
218
219
    def __init__(self, batches):
220
        """Initialize a new SuggestionResults from an iterable that provides
221
        SuggestionBatch objects."""
222
223
        self.batches = batches
224
225
    def filter(self, limit=None, threshold=0.0):
226
        """Return a view of these suggestions, filtered by the given limit
227
        and/or threshold, as another SuggestionResults object."""
228
229
        return SuggestionResults(
230
            (batch.filter(limit, threshold) for batch in self.batches)
231
        )
232
233
    def __iter__(self):
234
        return iter(itertools.chain.from_iterable(self.batches))
235