typeddfs.matrix_dfs   B
last analyzed

Complexity

Total Complexity 48

Size/Duplication

Total Lines 311
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 161
dl 0
loc 311
rs 8.5599
c 0
b 0
f 0
wmc 48

26 Methods

Rating   Name   Duplication   Size   Complexity  
A _MatrixDf.convert() 0 24 5
A MatrixDf.get_typing() 0 3 1
A _MatrixDf.__repr__() 0 3 1
A _MatrixDf.sub_matrix() 0 5 1
A _MatrixDf.triangle() 0 11 3
A AffinityMatrixDf.new_df() 0 22 2
A _MatrixDf.shuffle() 0 14 3
A _MatrixDf.__str__() 0 2 1
A _MatrixDf.dim_str() 0 7 1
A _MatrixDf.long_form() 0 12 3
A MatrixDf.new_df() 0 31 3
A _MatrixDf.cols() 0 6 1
A _MatrixDf.diagonals() 0 5 1
A _MatrixDf.rows() 0 6 1
A AffinityMatrixDf.symmetrize() 0 5 1
A _MatrixDf.dims() 0 6 1
A _MatrixDf.is_symmetric() 0 5 1
A _MatrixDf._repr_html_() 0 5 2
A AffinityMatrixDf.get_typing() 0 3 1
A _MatrixDf.flatten() 0 5 1
A _MatrixDf.sort_alphabetical() 0 8 1
B _MatrixDf._check() 0 14 6
A AffinityMatrixDf.__repr__() 0 3 1
A LongFormMatrixDf.get_typing() 0 3 1
A AffinityMatrixDf._check() 0 12 4
A AffinityMatrixDf.__str__() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like typeddfs.matrix_dfs often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to typed-dfs
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/typed-dfs
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
DataFrames that are essentially n-by-m matrices.
6
"""
7
from __future__ import annotations
8
9
import abc
10
from copy import deepcopy
11
from functools import partial
12
from inspect import cleandoc
13
from typing import TYPE_CHECKING
14
15
import numpy as np
16
import pandas as pd
17
18
from typeddfs.base_dfs import BaseDf
19
from typeddfs.df_errors import (
20
    InvalidDfError,
21
    RowColumnMismatchError,
22
    VerificationFailedError,
23
)
24
from typeddfs.df_typing import FINAL_DF_TYPING, DfTyping
25
from typeddfs.typed_dfs import TypedDf
26
27
if TYPE_CHECKING:
28
    from collections.abc import Sequence
29
30
    from numpy.random import RandomState
31
32
33
class LongFormMatrixDf(TypedDf):
34
    """
35
    A long-form matrix with columns "row", "column", and "value".
36
    """
37
38
    @classmethod
39
    def get_typing(cls) -> DfTyping:
40
        return DfTyping(_required_columns=["row", "column", "value"])
41
42
43
class _MatrixDf(BaseDf, metaclass=abc.ABCMeta):
44
    @classmethod
45
    def convert(cls, df: pd.DataFrame) -> __qualname__:
46
        if not isinstance(df, pd.DataFrame):
47
            msg = f"Can't convert {type(df)} to {cls.__name__}"
48
            raise TypeError(msg)
49
        # first always reset the index so we can manage what's in the index vs columns
50
        # index_names() will return [] if no named indices are found
51
        df.__class__ = cls
52
        t = cls.get_typing()
53
        # df = df.vanilla_reset()
54
        # df = df.set_index(t.required_index_names[0])
55
        if df.index.names == [None] and "row" in df.columns:
56
            df = df.set_index("row")
57
        df.columns.name = "column"
58
        df.index.name = "row"
59
        if t.value_dtype is not None:
60
            df = df.astype(t.value_dtype)
61
        df.index = df.index.astype(str)
62
        df.columns = df.columns.astype(str)
63
        # now change the class
64
        df.__class__ = cls
65
        # noinspection PyProtectedMember
66
        cls._check(df)
67
        return df
68
69
    @classmethod
70
    def _check(cls, df) -> None:
71
        t = cls.get_typing()
72
        # TODO: Why doesn't .dtype work?
73
        if [str(c) for c in df.index.names] != list(df.index.names):
74
            msg = "Some index names are non-str"
75
            raise InvalidDfError(msg)
76
        if [str(c) for c in df.columns] != df.columns.tolist():
77
            msg = "Some columns are non-str"
78
            raise InvalidDfError(msg)
79
        for req in t.verifications:
80
            value = req(df)
81
            if value is not None and value is not True:
82
                raise VerificationFailedError(str(value))
83
84
    def is_symmetric(self) -> bool:
85
        """
86
        Returns True if the matrix is fully symmetric with exact equality.
87
        """
88
        return self.rows == self.cols and np.array_equal(self.values, self.T.values)
89
90
    def sub_matrix(self, rows: set[str], cols: set[str]) -> __qualname__:
91
        """
92
        Returns a matrix containing only these labels.
93
        """
94
        return self.__class__(self.loc[rows][cols])
95
96
    def long_form(self) -> LongFormMatrixDf:
97
        """
98
        Melts into a long-form DataFrame with columns "row", "column", and "value".
99
100
        Consider calling ``triangle`` first if the matrix is (always) symmetric.
101
        """
102
        # TODO: melt wasn't working
103
        df = []
104
        for r, row in enumerate(self.rows):
105
            for c, col in enumerate(self.cols):
106
                df.append(pd.Series({"row": row, "column": col, "value": self.iat[r, c]}))
107
        return LongFormMatrixDf.convert(pd.DataFrame(df))
108
109
    def triangle(self, upper: bool = False, strict: bool = False) -> __qualname__:
110
        """
111
        NaNs out the upper (or lower) triangle, returning a copy.
112
113
        Arguments:
114
            upper: Keep the upper triangular matrix instead of the lower
115
            strict: Discard the diagonal (set it to NaN)
116
        """
117
        fn = np.triu if upper else np.tril
118
        fn = partial(fn, k=1) if strict else fn
119
        return self.__class__(self.where(fn(np.ones(self.shape)).astype(bool)))
120
121
    def sort_alphabetical(self) -> __qualname__:
122
        """
123
        Sorts by the rows and columns alphabetically.
124
        """
125
        df = self.sort_natural_index()
126
        df = df.transpose().sort_natural_index()
127
        df = df.transpose()
128
        return df
129
130
    def shuffle(self, rand: None | int | RandomState = None) -> __qualname__:
131
        """
132
        Returns a copy with every value mapped to a new location.
133
        Destroys the correct links between labels and values.
134
        Useful for permutation tests.
135
        """
136
        cp = deepcopy(self.flatten())
137
        if rand is None:
138
            rand = np.random.RandomState()
139
        elif isinstance(rand, int):
140
            rand = np.random.RandomState(seed=rand)
141
        rand.shuffle(cp)
142
        values = cp.reshape((len(self.rows), len(self.columns)))
143
        return self.__class__(values, index=self.rows, columns=self.columns)
144
145
    def diagonals(self) -> np.array:
146
        """
147
        Returns an array of the diagonal elements.
148
        """
149
        return pd.Series(np.diag(self), index=[self.index, self.columns]).values
150
151
    def flatten(self) -> np.array:
152
        """
153
        Flattens the values into a 1-d array.
154
        """
155
        return self.values.flatten()
156
157
    @property
158
    def dim_str(self) -> str:
159
        """
160
        Returns a simple string of n_rows by n_columns.
161
        E.g.: ``15 x 15``.
162
        """
163
        return f"{len(self.rows)} x {len(self.columns)}"
164
165
    @property
166
    def dims(self) -> tuple[int, int]:
167
        """
168
        Returns (n rows, n_columns).
169
        """
170
        return len(self.rows), len(self.columns)
171
172
    @property
173
    def rows(self) -> Sequence[str]:
174
        """
175
        Returns the row labels.
176
        """
177
        return self.index.tolist()
178
179
    @property
180
    def cols(self) -> Sequence[str]:
181
        """
182
        Returns the column labels.
183
        """
184
        return self.columns.tolist()
185
186
    def _repr_html_(self) -> str:
187
        cls = self.__class__
188
        mark = "✅" if self.__class__.is_valid(self) else "❌"
189
        return cleandoc(
190
            f"""
191
            <strong>{cls.name}: {self.dim} {mark}</strong>
192
            {pd.DataFrame._repr_html_(self)}
193
        """,
194
        )
195
196
    def __repr__(self) -> str:
197
        return (
198
            f"{self.__class__.__name__}({len(self.rows)} x {len(self.columns)} @ {hex(id(self))})"
199
        )
200
201
    def __str__(self) -> str:
202
        return f"{self.__class__.__name__}({len(self.rows)} x {len(self.columns)})"
203
204
205
class MatrixDf(_MatrixDf):
206
    """
207
    A dataframe that is best thought of as a simple matrix.
208
    Contains a single index level and a list of columns,
209
    with numerical values of a single dtype.
210
    """
211
212
    @classmethod
213
    def get_typing(cls) -> DfTyping:
214
        return FINAL_DF_TYPING  # default only -- should be overridden
215
216
    @classmethod
217
    def new_df(
218
        cls,
219
        rows: int | Sequence[str] = 0,
220
        cols: int | Sequence[str] = 0,
221
        fill: int | float | complex = 0,
222
    ) -> __qualname__:
223
        """
224
        Returns a DataFrame that is empty but valid.
225
226
        Arguments:
227
            rows: Either a number of rows or a sequence of labels.
228
                  If a number is given, will choose (str-type) labels '0', '1', ...
229
            cols: Either a number of columns or a sequence of labels.
230
                  If a number is given, will choose (str-type) labels '0', '1', ...
231
            fill: A value to fill in every cell.
232
                  Should match ``self.required_dtype``.
233
                  String values are
234
235
        Raises:
236
            InvalidDfError: If a function in ``verifications`` fails (returns False or a string).
237
            IntCastingNaNError: If ``fill`` is NaN or inf and ``self.required_dtype`` does not support it.
238
        """
239
        if isinstance(rows, int):
240
            rows = [str(r) for r in range(rows)]
241
        if isinstance(cols, int):
242
            cols = [str(c) for c in range(cols)]
243
        a = np.ndarray(shape=(len(rows), len(cols)))
244
        a.fill(fill)
245
        df = pd.DataFrame(a, columns=cols)
246
        return cls.convert(df)
247
248
249
class AffinityMatrixDf(_MatrixDf):
250
    """
251
    A similarity or distance matrix.
252
    The rows and columns must match, and only 1 index is allowed.
253
    """
254
255
    @classmethod
256
    def get_typing(cls) -> DfTyping:
257
        return FINAL_DF_TYPING  # default only -- should be overridden
258
259
    @classmethod
260
    def new_df(cls, n: int | Sequence[str] = 0, fill: int | float | complex = 0) -> __qualname__:
261
        """
262
        Returns a DataFrame that is empty but valid.
263
264
        Arguments:
265
            n:    Either a number of rows/columns or a sequence of labels.
266
                  If a number is given, will choose (str-type) labels '0', '1', ...
267
            fill: A value to fill in every cell.
268
                  Should match ``self.required_dtype``.
269
270
        Raises:
271
            InvalidDfError: If a function in ``verifications`` fails (returns False or a string).
272
            IntCastingNaNError: If ``fill`` is NaN or inf and ``self.required_dtype`` does not support it.
273
        """
274
        if isinstance(n, int):
275
            n = [str(c) for c in range(n)]
276
        a = np.ndarray(shape=(len(n), len(n)))
277
        a.fill(fill)
278
        df = pd.DataFrame(a, columns=n)
279
        df["row"] = n
280
        return cls.convert(df)
281
282
    @classmethod
283
    def _check(cls, df: BaseDf):
284
        rows = df.index.tolist()
285
        cols = df.columns.tolist()
286
        t = cls.get_typing()
287
        if df.rows != df.cols:
288
            msg = f"Rows {rows} but columns {cols}"
289
            raise RowColumnMismatchError(msg, rows=rows, columns=cols)
290
        for req in t.verifications:
291
            value = req(df)
292
            if value is not None:
293
                raise VerificationFailedError(value)
294
295
    def __repr__(self) -> str:
296
        return (
297
            f"{self.__class__.__name__}({len(self.rows)} x {len(self.columns)} @ {hex(id(self))})"
298
        )
299
300
    def __str__(self) -> str:
301
        return f"{self.__class__.__name__}({len(self.rows)} x {len(self.columns)})"
302
303
    def symmetrize(self) -> __qualname__:
304
        """
305
        Averages with its transpose, forcing it to be symmetric.
306
        """
307
        return self.__class__(0.5 * (self + self.T))
308
309
310
__all__ = ["MatrixDf", "AffinityMatrixDf", "LongFormMatrixDf"]
311