Passed
Push — issue678-refactor-suggestionre... ( 8342bd...2bf8c6 )
by Osma
03:05
created

annif.suggestion.SuggestionBatch.from_sequence()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 3
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
"""Representing suggested subjects."""
2
3
import abc
4
import collections
5
import itertools
6
7
import numpy as np
8
from scipy.sparse import csr_array, dok_array
9
10
SubjectSuggestion = collections.namedtuple("SubjectSuggestion", "subject_id score")
11
WeightedSuggestionsBatch = collections.namedtuple(
12
    "WeightedSuggestionsBatch", "hit_sets weight subjects"
13
)
14
15
16
class SuggestionFilter:
17
    """A reusable filter for filtering SubjectSuggestion objects."""
18
19
    def __init__(self, subject_index, limit=None, threshold=0.0):
20
        self._subject_index = subject_index
21
        self._limit = limit
22
        self._threshold = threshold
23
24
    def __call__(self, orighits):
25
        return LazySuggestionResult(
26
            lambda: orighits.filter(self._subject_index, self._limit, self._threshold)
27
        )
28
29
30
class SuggestionResult(metaclass=abc.ABCMeta):
31
    """Abstract base class for a set of hits returned by an analysis
32
    operation."""
33
34
    @abc.abstractmethod
35
    def as_list(self):
36
        """Return the hits as an ordered sequence of SubjectSuggestion objects,
37
        highest scores first."""
38
        pass  # pragma: no cover
39
40
    @abc.abstractmethod
41
    def as_vector(self, size, destination=None):
42
        """Return the hits as a one-dimensional score vector of given size.
43
        If destination array is given (not None) it will be used, otherwise a
44
        new array will be created."""
45
        pass  # pragma: no cover
46
47
    @abc.abstractmethod
48
    def filter(self, subject_index, limit=None, threshold=0.0):
49
        """Return a subset of the hits, filtered by the given limit and
50
        score threshold, as another SuggestionResult object."""
51
        pass  # pragma: no cover
52
53
    @abc.abstractmethod
54
    def __len__(self):
55
        """Return the number of hits with non-zero scores."""
56
        pass  # pragma: no cover
57
58
59
class LazySuggestionResult(SuggestionResult):
60
    """SuggestionResult implementation that wraps another SuggestionResult which
61
    is initialized lazily only when it is actually accessed. Method calls
62
    will be proxied to the wrapped SuggestionResult."""
63
64
    def __init__(self, construct):
65
        """Create the proxy object. The given construct function will be
66
        called to create the actual SuggestionResult when it is needed."""
67
        self._construct = construct
68
        self._object = None
69
70
    def _initialize(self):
71
        if self._object is None:
72
            self._object = self._construct()
73
74
    def as_list(self):
75
        self._initialize()
76
        return self._object.as_list()
77
78
    def as_vector(self, size, destination=None):
79
        self._initialize()
80
        return self._object.as_vector(size, destination)
81
82
    def filter(self, subject_index, limit=None, threshold=0.0):
83
        self._initialize()
84
        return self._object.filter(subject_index, limit, threshold)
85
86
    def __len__(self):
87
        self._initialize()
88
        return len(self._object)
89
90
91
class VectorSuggestionResult(SuggestionResult):
92
    """SuggestionResult implementation based primarily on NumPy vectors."""
93
94
    def __init__(self, vector):
95
        vector_f32 = vector.astype(np.float32)
96
        # limit scores to the range 0.0 .. 1.0
97
        self._vector = np.minimum(np.maximum(vector_f32, 0.0), 1.0)
98
        self._subject_order = None
99
        self._lsr = None
100
101
    def _vector_to_list_suggestion(self):
102
        hits = []
103
        for subject_id in self.subject_order:
104
            score = self._vector[subject_id]
105
            if score <= 0.0:
106
                break  # we can skip the remaining ones
107
            hits.append(SubjectSuggestion(subject_id=subject_id, score=float(score)))
108
        return ListSuggestionResult(hits)
109
110
    @property
111
    def subject_order(self):
112
        if self._subject_order is None:
113
            self._subject_order = np.argsort(self._vector)[::-1]
114
        return self._subject_order
115
116
    def as_list(self):
117
        if self._lsr is None:
118
            self._lsr = self._vector_to_list_suggestion()
119
        return self._lsr.as_list()
120
121
    def as_vector(self, size, destination=None):
122
        if destination is not None:
123
            np.copyto(destination, self._vector)
124
            return destination
125
        return self._vector
126
127
    def filter(self, subject_index, limit=None, threshold=0.0):
128
        mask = self._vector > threshold
129
        deprecated_ids = subject_index.deprecated_ids()
130
        if limit is not None:
131
            limit_mask = np.zeros_like(self._vector, dtype=bool)
132
            deprecated_set = set(deprecated_ids)
133
            top_k_subjects = itertools.islice(
134
                (subj for subj in self.subject_order if subj not in deprecated_set),
135
                limit,
136
            )
137
            limit_mask[list(top_k_subjects)] = True
138
            mask = mask & limit_mask
139
        else:
140
            deprecated_mask = np.ones_like(self._vector, dtype=bool)
141
            deprecated_mask[deprecated_ids] = False
142
            mask = mask & deprecated_mask
143
        vsr = VectorSuggestionResult(self._vector * mask)
144
        return ListSuggestionResult(vsr.as_list())
145
146
    def __len__(self):
147
        return (self._vector > 0.0).sum()
148
149
150
class ListSuggestionResult(SuggestionResult):
151
    """SuggestionResult implementation based primarily on lists of hits."""
152
153
    def __init__(self, hits):
154
        self._list = [self._enforce_score_range(hit) for hit in hits if hit.score > 0.0]
155
        self._vector = None
156
157
    @staticmethod
158
    def _enforce_score_range(hit):
159
        if hit.score > 1.0:
160
            return hit._replace(score=1.0)
161
        return hit
162
163
    def _list_to_vector(self, size, destination):
164
        if destination is None:
165
            destination = np.zeros(size, dtype=np.float32)
166
167
        for hit in self._list:
168
            if hit.subject_id is not None:
169
                destination[hit.subject_id] = hit.score
170
        return destination
171
172
    def as_list(self):
173
        return self._list
174
175
    def as_vector(self, size, destination=None):
176
        if self._vector is None:
177
            self._vector = self._list_to_vector(size, destination)
178
        return self._vector
179
180
    def filter(self, subject_index, limit=None, threshold=0.0):
181
        hits = sorted(self._list, key=lambda hit: hit.score, reverse=True)
182
        filtered_hits = [
183
            hit
184
            for hit in hits
185
            if hit.score >= threshold and hit.score > 0.0 and hit.subject_id is not None
186
        ]
187
        if limit is not None:
188
            filtered_hits = filtered_hits[:limit]
189
        return ListSuggestionResult(filtered_hits)
190
191
    def __len__(self):
192
        return len(self._list)
193
194
195
class SparseSuggestionResult(SuggestionResult):
196
    """SuggestionResult implementation backed by a single row of a sparse array."""
197
198
    def __init__(self, array, idx):
199
        self._array = array
200
        self._idx = idx
201
202
    def as_list(self):
203
        _, cols = self._array[[self._idx], :].nonzero()
204
        suggestions = [
205
            SubjectSuggestion(subject_id=col, score=float(self._array[self._idx, col]))
206
            for col in cols
207
        ]
208
        return sorted(
209
            suggestions, key=lambda suggestion: suggestion.score, reverse=True
210
        )
211
212
    def as_vector(self, size, destination=None):
213
        if destination is not None:
214
            print("as_vector called with destination not None")
215
            return None
216
        return self._array[[self._idx], :].toarray()[0]
217
218
    def filter(self, subject_index, limit=None, threshold=0.0):
219
        lsr = ListSuggestionResult(self.as_list())
220
        return lsr.filter(subject_index, limit, threshold)
221
222
    def __len__(self):
223
        _, cols = self._array[[self._idx], :].nonzero()
224
        return len(cols)
225
226
227
class SuggestionBatch:
228
    """Subject suggestions for a batch of documents."""
229
230
    def __init__(self, array):
231
        """Create a new SuggestionBatch from a csr_array"""
232
        self.array = array
233
234
    @classmethod
235
    def from_sequence(cls, suggestion_results, vocab_size):
236
        """Create a new SuggestionBatch from a sequence of SuggestionResult objects."""
237
238
        # create a dok_array for fast construction
239
        ar = dok_array((len(suggestion_results), vocab_size), dtype=np.float32)
240
        for idx, result in enumerate(suggestion_results):
241
            for suggestion in result.as_list():
242
                ar[idx, suggestion.subject_id] = suggestion.score
243
        return cls(ar.tocsr())
244
245
    def filter(self, subject_index, limit=None, threshold=0.0):
246
        """Return a subset of the hits, filtered by the given limit and
247
        score threshold, as another SuggestionBatch object."""
248
249
        from annif.util import filter_suggestion
250
251
        return SuggestionBatch(filter_suggestion(self.array, limit, threshold))
252
253
    def __getitem__(self, idx):
254
        if idx < 0 or idx >= len(self):
255
            raise IndexError
256
        return SparseSuggestionResult(self.array, idx)
257
258
    def __len__(self):
259
        return self.array.shape[0]
260