Passed
Push — issue678-refactor-suggestionre... ( e2c657...5b266a )
by Osma
02:54
created

VectorSuggestionResult.subject_order()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""Representing suggested subjects."""
2
3
import abc
4
import collections
5
import itertools
6
7
import numpy as np
8
from scipy.sparse import dok_array
9
10
SubjectSuggestion = collections.namedtuple("SubjectSuggestion", "subject_id score")
11
WeightedSuggestionsBatch = collections.namedtuple(
12
    "WeightedSuggestionsBatch", "hit_sets weight subjects"
13
)
14
15
16
def filter_suggestion(preds, limit=None, threshold=0.0):
17
    """filter a 2D sparse suggestion array (csr_array), retaining only the
18
    top K suggestions with a score above or equal to the threshold for each
19
    individual prediction; the rest will be left as zeros"""
20
21
    filtered = dok_array(preds.shape, dtype=np.float32)
22
    for row in range(preds.shape[0]):
23
        arow = preds.getrow(row)
24
        top_k = arow.data.argsort()[::-1]
25
        if limit is not None:
26
            top_k = top_k[:limit]
27
        for idx in top_k:
28
            val = arow.data[idx]
29
            if val < threshold:
30
                break
31
            filtered[row, arow.indices[idx]] = val
32
    return filtered.tocsr()
33
34
35
class SuggestionResult(metaclass=abc.ABCMeta):
36
    """Abstract base class for a set of hits returned by an analysis
37
    operation."""
38
39
    @abc.abstractmethod
40
    def __iter__(self):
41
        """Return the hits as an iterator that returns SubjectSuggestion objects,
42
        highest scores first."""
43
        pass  # pragma: no cover
44
45
    @abc.abstractmethod
46
    def as_vector(self, size, destination=None):
47
        """Return the hits as a one-dimensional score vector of given size.
48
        If destination array is given (not None) it will be used, otherwise a
49
        new array will be created."""
50
        pass  # pragma: no cover
51
52
    @abc.abstractmethod
53
    def __len__(self):
54
        """Return the number of hits with non-zero scores."""
55
        pass  # pragma: no cover
56
57
58
class VectorSuggestionResult(SuggestionResult):
59
    """SuggestionResult implementation based primarily on NumPy vectors."""
60
61
    def __init__(self, vector):
62
        vector_f32 = vector.astype(np.float32)
63
        # limit scores to the range 0.0 .. 1.0
64
        self._vector = np.minimum(np.maximum(vector_f32, 0.0), 1.0)
65
        self._subject_order = None
66
        self._lsr = None
67
68
    def _vector_to_list_suggestion(self):
69
        hits = []
70
        for subject_id in self.subject_order:
71
            score = self._vector[subject_id]
72
            if score <= 0.0:
73
                break  # we can skip the remaining ones
74
            hits.append(SubjectSuggestion(subject_id=subject_id, score=float(score)))
75
        return ListSuggestionResult(hits)
76
77
    @property
78
    def subject_order(self):
79
        if self._subject_order is None:
80
            self._subject_order = np.argsort(self._vector)[::-1]
81
        return self._subject_order
82
83
    def __iter__(self):
84
        if self._lsr is None:
85
            self._lsr = self._vector_to_list_suggestion()
86
        return iter(self._lsr)
87
88
    def as_vector(self, size, destination=None):
89
        if destination is not None:
90
            np.copyto(destination, self._vector)
91
            return destination
92
        return self._vector
93
94
    def __len__(self):
95
        return (self._vector > 0.0).sum()
96
97
98
class ListSuggestionResult(SuggestionResult):
99
    """SuggestionResult implementation based primarily on lists of hits."""
100
101
    def __init__(self, hits):
102
        self._list = [self._enforce_score_range(hit) for hit in hits if hit.score > 0.0]
103
        self._vector = None
104
105
    @staticmethod
106
    def _enforce_score_range(hit):
107
        if hit.score > 1.0:
108
            return hit._replace(score=1.0)
109
        return hit
110
111
    def _list_to_vector(self, size, destination):
112
        if destination is None:
113
            destination = np.zeros(size, dtype=np.float32)
114
115
        for hit in self._list:
116
            if hit.subject_id is not None:
117
                destination[hit.subject_id] = hit.score
118
        return destination
119
120
    def __iter__(self):
121
        return iter(self._list)
122
123
    def as_vector(self, size, destination=None):
124
        if self._vector is None:
125
            self._vector = self._list_to_vector(size, destination)
126
        return self._vector
127
128
    def __len__(self):
129
        return len(self._list)
130
131
132
class SparseSuggestionResult(SuggestionResult):
133
    """SuggestionResult implementation backed by a single row of a sparse array."""
134
135
    def __init__(self, array, idx):
136
        self._array = array
137
        self._idx = idx
138
139
    def __iter__(self):
140
        _, cols = self._array[[self._idx], :].nonzero()
141
        suggestions = [
142
            SubjectSuggestion(subject_id=col, score=float(self._array[self._idx, col]))
143
            for col in cols
144
        ]
145
        return iter(
146
            sorted(suggestions, key=lambda suggestion: suggestion.score, reverse=True)
147
        )
148
149
    def as_vector(self, size, destination=None):
150
        if destination is not None:
151
            print("as_vector called with destination not None")
152
            return None
153
        return self._array[[self._idx], :].toarray()[0]
154
155
    def __len__(self):
156
        _, cols = self._array[[self._idx], :].nonzero()
157
        return len(cols)
158
159
160
class SuggestionBatch:
161
    """Subject suggestions for a batch of documents."""
162
163
    def __init__(self, array):
164
        """Create a new SuggestionBatch from a csr_array"""
165
        self.array = array
166
167
    @classmethod
168
    def from_sequence(cls, suggestion_results, vocab_size, limit=None):
169
        """Create a new SuggestionBatch from a sequence of SuggestionResult objects."""
170
171
        # create a dok_array for fast construction
172
        ar = dok_array((len(suggestion_results), vocab_size), dtype=np.float32)
173
        for idx, result in enumerate(suggestion_results):
174
            for suggestion in itertools.islice(result, limit):
175
                ar[idx, suggestion.subject_id] = suggestion.score
176
        return cls(ar.tocsr())
177
178
    def filter(self, limit=None, threshold=0.0):
179
        """Return a subset of the hits, filtered by the given limit and
180
        score threshold, as another SuggestionBatch object."""
181
182
        return SuggestionBatch(filter_suggestion(self.array, limit, threshold))
183
184
    def __getitem__(self, idx):
185
        if idx < 0 or idx >= len(self):
186
            raise IndexError
187
        return SparseSuggestionResult(self.array, idx)
188
189
    def __len__(self):
190
        return self.array.shape[0]
191
192
193
class SuggestionResults:
194
    """Subject suggestions for a potentially very large number of documents."""
195
196
    def __init__(self, batches):
197
        """Initialize a new SuggestionResults from an iterable that provides
198
        SuggestionBatch objects."""
199
200
        self.batches = batches
201
202
    def filter(self, limit=None, threshold=0.0):
203
        """Return a view of these suggestions, filtered by the given limit
204
        and/or threshold, as another SuggestionResults object."""
205
206
        return SuggestionResults(
207
            (batch.filter(limit, threshold) for batch in self.batches)
208
        )
209
210
    def __iter__(self):
211
        return iter(itertools.chain.from_iterable(self.batches))
212