TokenSetIndex._find_subj_tsets()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 9
nop 2
dl 0
loc 15
rs 8.6666
c 0
b 0
f 0
1
"""Index for fast matching of token sets."""
2
3
from __future__ import annotations
4
5
import collections
6
from typing import TYPE_CHECKING
7
8
if TYPE_CHECKING:
9
    from numpy import ndarray
10
11
12
class TokenSet:
13
    """Represents a set of tokens (expressed as integer token IDs) that can
14
    be matched with another set of tokens. A TokenSet can optionally
15
    be associated with a subject from the vocabulary."""
16
17
    def __init__(
18
        self,
19
        tokens: ndarray,
20
        subject_id: int | None = None,
21
        is_pref: bool = False,
22
    ) -> None:
23
        self._tokens = frozenset(tokens)
24
        self.key = tokens[0] if len(tokens) else None
25
        self.subject_id = subject_id
26
        self.is_pref = is_pref
27
28
    def __len__(self) -> int:
29
        return len(self._tokens)
30
31
    def __iter__(self):
32
        return iter(self._tokens)
33
34
    @property
35
    def tokens(self) -> frozenset:
36
        return self._tokens
37
38
    def contains(self, other: TokenSet) -> bool:
39
        """Returns True iff the tokens in the other TokenSet are all
40
        included within this TokenSet."""
41
42
        return other._tokens.issubset(self._tokens)
43
44
    def __setstate__(self, state):
45
        # Convert _tokens to a frozenset if it's a set.
46
        # Might happen when using models saved using Annif 1.2 or older.
47
        self.__dict__ = state
48
        if isinstance(self._tokens, set):
49
            self._tokens = frozenset(self._tokens)  # pragma: no cover
50
51
52
class TokenSetIndex:
53
    """A searchable index of TokenSets (representing vocabulary terms)"""
54
55
    def __init__(self) -> None:
56
        self._index = collections.defaultdict(set)
57
58
    def __len__(self) -> int:
59
        return len(self._index)
60
61
    def add(self, tset: TokenSet) -> None:
62
        """Add a TokenSet into this index"""
63
        if tset.key is not None:
64
            self._index[tset.key].add(tset)
65
66
    def _find_subj_tsets(self, tset: TokenSet) -> dict[int | None, TokenSet]:
67
        """return a dict (subject_id : TokenSet) of matches contained in the
68
        given TokenSet"""
69
70
        subj_tsets = {}
71
72
        for token in tset:
73
            for ts in self._index[token]:
74
                if tset.contains(ts) and (
75
                    ts.subject_id not in subj_tsets
76
                    or not subj_tsets[ts.subject_id].is_pref
77
                ):
78
                    subj_tsets[ts.subject_id] = ts
79
80
        return subj_tsets
81
82
    def _find_subj_ambiguity(self, tsets: list[TokenSet]):
83
        """calculate the ambiguity values (the number of other TokenSets
84
        that also match the same tokens) for the given TokenSets and return
85
        them as a dict-like object (subject_id : ambiguity_value)"""
86
87
        # group the TokenSets by their tokens, so that TokenSets with
88
        # identical tokens can be considered together all in one go
89
        elim_tsets = collections.defaultdict(set)
90
        for ts in tsets:
91
            elim_tsets[ts.tokens].add(ts.subject_id)
92
93
        subj_ambiguity = collections.Counter()
94
95
        for tokens1, subjs1 in elim_tsets.items():
96
            for tokens2, subjs2 in elim_tsets.items():
97
                if not tokens2.issuperset(tokens1):
98
                    continue
99
                for subj in subjs1:
100
                    subj_ambiguity[subj] += len(subjs2) - int(subj in subjs2)
101
102
        return subj_ambiguity
103
104
    def search(self, tset: TokenSet) -> list[tuple[TokenSet, int]]:
105
        """Return the TokenSets that are contained in the given TokenSet.
106
        The matches are returned as a list of (TokenSet, ambiguity) pairs
107
        where ambiguity is an integer indicating the number of other TokenSets
108
        that also match the same tokens."""
109
110
        subj_tsets = self._find_subj_tsets(tset)
111
        subj_ambiguity = self._find_subj_ambiguity(subj_tsets.values())
112
113
        return [
114
            (ts, subj_ambiguity[subject_id]) for subject_id, ts in subj_tsets.items()
115
        ]
116