GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — main ( c197b5...802c3b )
by
unknown
06:48
created

klib.clean.data_cleaning()   B

Complexity

Conditions 4

Size

Total Lines 113
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 43
nop 11
dl 0
loc 113
rs 8.8478
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
Functions for data cleaning.
3
4
:author: Andreas Kanz
5
"""
6
7
import itertools
8
import re
9
from typing import List, Optional, Union
10
11
import numpy as np
12
import pandas as pd
13
14
from klib.describe import corr_mat
15
from klib.utils import (
16
    _diff_report,
17
    _drop_duplicates,
18
    _missing_vals,
19
    _validate_input_bool,
20
    _validate_input_range,
21
)
22
23
__all__ = [
24
    "clean_column_names",
25
    "convert_datatypes",
26
    "data_cleaning",
27
    "drop_missing",
28
    "mv_col_handling",
29
]
30
31
32
def _optimize_ints(data: Union[pd.Series, pd.DataFrame]) -> pd.DataFrame:
33
    data = pd.DataFrame(data).copy()
34
    ints = data.select_dtypes(include=["int64"]).columns.tolist()
35
    data[ints] = data[ints].apply(pd.to_numeric, downcast="integer")
36
    return data
37
38
39
def _optimize_floats(data: Union[pd.Series, pd.DataFrame]) -> pd.DataFrame:
40
    data = pd.DataFrame(data).copy()
41
    floats = data.select_dtypes(include=["float64"]).columns.tolist()
42
    data[floats] = data[floats].apply(pd.to_numeric, downcast="float")
43
    return data
44
45
46
def clean_column_names(data: pd.DataFrame, hints: bool = True) -> pd.DataFrame:
47
    """Clean the column names of the provided Pandas Dataframe and optionally \
48
        provides hints on duplicate and long column names.
49
50
    Parameters
51
    ----------
52
    data : pd.DataFrame
53
        Original Dataframe with columns to be cleaned
54
    hints : bool, optional
55
        Print out hints on column name duplication and colum name length, by default \
56
        True
57
58
    Returns
59
    -------
60
    pd.DataFrame
61
        Pandas DataFrame with cleaned column names
62
    """
63
    _validate_input_bool(hints, "hints")
64
65
    # Handle CamelCase
66
    for i, col in enumerate(data.columns):
67
        matches = re.findall(re.compile("[a-z][A-Z]"), col)
68
        column = col
69
        for match in matches:
70
            column = column.replace(match, match[0] + "_" + match[1])
71
            data.rename(columns={data.columns[i]: column}, inplace=True)
72
73
    data.columns = (
74
        data.columns.str.replace("\n", "_", regex=False)
75
        .str.replace("(", "_", regex=False)
76
        .str.replace(")", "_", regex=False)
77
        .str.replace("'", "_", regex=False)
78
        .str.replace('"', "_", regex=False)
79
        .str.replace(".", "_", regex=False)
80
        .str.replace("-", "_", regex=False)
81
        .str.replace(r"[!?:;/]", "_", regex=True)
82
        .str.replace("+", "_plus_", regex=False)
83
        .str.replace("*", "_times_", regex=False)
84
        .str.replace("<", "_smaller", regex=False)
85
        .str.replace(">", "_larger_", regex=False)
86
        .str.replace("=", "_equal_", regex=False)
87
        .str.replace("ä", "ae", regex=False)
88
        .str.replace("ö", "oe", regex=False)
89
        .str.replace("ü", "ue", regex=False)
90
        .str.replace("ß", "ss", regex=False)
91
        .str.replace("%", "_percent_", regex=False)
92
        .str.replace("$", "_dollar_", regex=False)
93
        .str.replace("€", "_euro_", regex=False)
94
        .str.replace("@", "_at_", regex=False)
95
        .str.replace("#", "_hash_", regex=False)
96
        .str.replace("&", "_and_", regex=False)
97
        .str.replace(r"\s+", "_", regex=True)
98
        .str.replace(r"_+", "_", regex=True)
99
        .str.strip("_")
100
        .str.lower()
101
    )
102
103
    dupl_idx = [i for i, x in enumerate(data.columns.duplicated()) if x]
104
    if dupl_idx:
105
        dupl_before = data.columns[dupl_idx].tolist()
106
        data.columns = [
107
            col if col not in data.columns[:i] else col + "_" + str(i)
108
            for i, col in enumerate(data.columns)
109
        ]
110
        if hints:
111
            print(
112
                f"Duplicate column names detected! Columns with index {dupl_idx} and "
113
                f"names {dupl_before}) have been renamed to "
114
                f"{data.columns[dupl_idx].tolist()}."
115
            )
116
117
    long_col_names = [x for x in data.columns if len(x) > 25]
118
    if long_col_names and hints:
119
        print(
120
            "Long column names detected (>25 characters). Consider renaming the "
121
            f"following columns {long_col_names}."
122
        )
123
124
    return data
125
126
127
def convert_datatypes(
128
    data: pd.DataFrame,
129
    category: bool = True,
130
    cat_threshold: float = 0.05,
131
    cat_exclude: Optional[List[Union[str, int]]] = None,
132
) -> pd.DataFrame:
133
    """Convert columns to best possible dtypes using dtypes supporting pd.NA.
134
135
    Temporarily not converting to integers due to an issue in pandas. This is expected \
136
        to be fixed in pandas 1.1. See https://github.com/pandas-dev/pandas/issues/33803
137
138
    Parameters
139
    ----------
140
    data : pd.DataFrame
141
        2D dataset that can be coerced into Pandas DataFrame
142
    category : bool, optional
143
        Change dtypes of columns with dtype "object" to "category". Set threshold \
144
        using cat_threshold or exclude columns using cat_exclude, by default True
145
    cat_threshold : float, optional
146
        Ratio of unique values below which categories are inferred and column dtype is \
147
        changed to categorical, by default 0.05
148
    cat_exclude : Optional[List[Union[str, int]]], optional
149
        List of columns to exclude from categorical conversion, by default None
150
151
    Returns
152
    -------
153
    pd.DataFrame
154
        Pandas DataFrame with converted Datatypes
155
    """
156
    # Validate Inputs
157
    _validate_input_bool(category, "Category")
158
    _validate_input_range(cat_threshold, "cat_threshold", 0, 1)
159
160
    cat_exclude = [] if cat_exclude is None else cat_exclude.copy()
161
162
    data = pd.DataFrame(data).copy()
163
    for col in data.columns:
164
        unique_vals_ratio = data[col].nunique(dropna=False) / data.shape[0]
165
        if (
166
            category
167
            and unique_vals_ratio < cat_threshold
168
            and col not in cat_exclude
169
            and data[col].dtype == "object"
170
        ):
171
            data[col] = data[col].astype("category")
172
173
        data[col] = data[col].convert_dtypes(
174
            infer_objects=True,
175
            convert_string=True,
176
            convert_integer=False,
177
            convert_boolean=True,
178
        )
179
180
    data = _optimize_ints(data)
181
    data = _optimize_floats(data)
182
183
    return data
184
185
186
def drop_missing(
187
    data: pd.DataFrame,
188
    drop_threshold_cols: float = 1,
189
    drop_threshold_rows: float = 1,
190
    col_exclude: Optional[List[str]] = None,
191
) -> pd.DataFrame:
192
    """Drop completely empty columns and rows by default and optionally provides \
193
        flexibility to loosen restrictions to drop additional non-empty columns and \
194
        rows based on the fraction of NA-values.
195
196
    Parameters
197
    ----------
198
    data : pd.DataFrame
199
        2D dataset that can be coerced into Pandas DataFrame
200
    drop_threshold_cols : float, optional
201
        Drop columns with NA-ratio equal to or above the specified threshold, by \
202
        default 1
203
    drop_threshold_rows : float, optional
204
        Drop rows with NA-ratio equal to or above the specified threshold, by default 1
205
    col_exclude : Optional[List[str]], optional
206
        Specify a list of columns to exclude from dropping. The excluded columns do \
207
        not affect the drop thresholds, by default None
208
209
    Returns
210
    -------
211
    pd.DataFrame
212
        Pandas DataFrame without any empty columns or rows
213
214
    Notes
215
    -----
216
    Columns are dropped first
217
    """
218
    # Validate Inputs
219
    _validate_input_range(drop_threshold_cols, "drop_threshold_cols", 0, 1)
220
    _validate_input_range(drop_threshold_rows, "drop_threshold_rows", 0, 1)
221
222
    col_exclude = [] if col_exclude is None else col_exclude.copy()
223
    data_exclude = data[col_exclude]
224
225
    data = pd.DataFrame(data).copy()
226
227
    data_dropped = data.drop(columns=col_exclude, errors="ignore")
228
    data_dropped = data_dropped.drop(
229
        columns=data_dropped.loc[
230
            :, _missing_vals(data)["mv_cols_ratio"] > drop_threshold_cols
231
        ].columns
232
    ).dropna(axis=1, how="all")
233
234
    data = pd.concat([data_dropped, data_exclude], axis=1)
235
236
    return data.drop(
237
        index=data.loc[
238
            _missing_vals(data)["mv_rows_ratio"] > drop_threshold_rows, :
239
        ].index
240
    ).dropna(axis=0, how="all")
241
242
243
def data_cleaning(
244
    data: pd.DataFrame,
245
    drop_threshold_cols: float = 0.9,
246
    drop_threshold_rows: float = 0.9,
247
    drop_duplicates: bool = True,
248
    convert_dtypes: bool = True,
249
    col_exclude: Optional[List[str]] = None,
250
    category: bool = True,
251
    cat_threshold: float = 0.03,
252
    cat_exclude: Optional[List[Union[str, int]]] = None,
253
    clean_col_names: bool = True,
254
    show: str = "changes",
255
) -> pd.DataFrame:
256
    """Perform initial data cleaning tasks on a dataset, such as dropping single \
257
        valued and empty rows, empty columns as well as optimizing the datatypes.
258
259
    Parameters
260
    ----------
261
    data : pd.DataFrame
262
        2D dataset that can be coerced into Pandas DataFrame
263
    drop_threshold_cols : float, optional
264
        Drop columns with NA-ratio equal to or above the specified threshold, by \
265
        default 0.9
266
    drop_threshold_rows : float, optional
267
        Drop rows with NA-ratio equal to or above the specified threshold, by \
268
        default 0.9
269
    drop_duplicates : bool, optional
270
        Drop duplicate rows, keeping the first occurence. This step comes after the \
271
        dropping of missing values, by default True
272
    convert_dtypes : bool, optional
273
        Convert dtypes using pd.convert_dtypes(), by default True
274
    col_exclude : Optional[List[str]], optional
275
        Specify a list of columns to exclude from dropping, by default None
276
    category : bool, optional
277
        Enable changing dtypes of "object" columns to "category". Set threshold using \
278
        cat_threshold. Requires convert_dtypes=True, by default True
279
    cat_threshold : float, optional
280
        Ratio of unique values below which categories are inferred and column dtype is \
281
        changed to categorical, by default 0.03
282
    cat_exclude : Optional[List[str]], optional
283
        List of columns to exclude from categorical conversion, by default None
284
    clean_column_names: bool, optional
285
        Cleans the column names and provides hints on duplicate and long names, by \
286
        default True
287
    show : str, optional
288
        {"all", "changes", None}, by default "changes"
289
        Specify verbosity of the output:
290
291
            * "all": Print information about the data before and after cleaning as \
292
            well as information about  changes and memory usage (deep). Please be \
293
            aware, that this can slow down the function by quite a bit.
294
            * "changes": Print out differences in the data before and after cleaning.
295
            * None: No information about the data and the data cleaning is printed.
296
297
    Returns
298
    -------
299
    pd.DataFrame
300
        Cleaned Pandas DataFrame
301
302
    See Also
303
    --------
304
    convert_datatypes: Convert columns to best possible dtypes.
305
    drop_missing : Flexibly drop columns and rows.
306
    _memory_usage: Gives the total memory usage in megabytes.
307
    _missing_vals: Metrics about missing values in the dataset.
308
309
    Notes
310
    -----
311
    The category dtype is not grouped in the summary, unless it contains exactly the \
312
    same categories.
313
    """
314
    # Validate Inputs
315
    _validate_input_range(drop_threshold_cols, "drop_threshold_cols", 0, 1)
316
    _validate_input_range(drop_threshold_rows, "drop_threshold_rows", 0, 1)
317
    _validate_input_bool(drop_duplicates, "drop_duplicates")
318
    _validate_input_bool(convert_dtypes, "convert_datatypes")
319
    _validate_input_bool(category, "category")
320
    _validate_input_range(cat_threshold, "cat_threshold", 0, 1)
321
322
    data = pd.DataFrame(data).copy()
323
    data_cleaned = drop_missing(
324
        data, drop_threshold_cols, drop_threshold_rows, col_exclude=col_exclude
325
    )
326
327
    if clean_col_names:
328
        data_cleaned = clean_column_names(data_cleaned)
329
330
    single_val_cols = data_cleaned.columns[
331
        data_cleaned.nunique(dropna=False) == 1
332
    ].tolist()
333
    data_cleaned = data_cleaned.drop(columns=single_val_cols)
334
335
    dupl_rows = None
336
337
    if drop_duplicates:
338
        data_cleaned, dupl_rows = _drop_duplicates(data_cleaned)
339
    if convert_dtypes:
340
        data_cleaned = convert_datatypes(
341
            data_cleaned,
342
            category=category,
343
            cat_threshold=cat_threshold,
344
            cat_exclude=cat_exclude,
345
        )
346
347
    _diff_report(
348
        data,
349
        data_cleaned,
350
        dupl_rows=dupl_rows,
351
        single_val_cols=single_val_cols,
352
        show=show,
353
    )
354
355
    return data_cleaned
356
357
358
def mv_col_handling(
359
    data: pd.DataFrame,
360
    target: Optional[Union[str, pd.Series, List]] = None,
361
    mv_threshold: float = 0.1,
362
    corr_thresh_features: float = 0.5,
363
    corr_thresh_target: float = 0.3,
364
    return_details: bool = False,
365
) -> pd.DataFrame:
366
    """Convert columns with a high ratio of missing values into binary features and \
367
    eventually drops them based on their correlation with other features and the \
368
    target variable.
369
370
    This function follows a three step process:
371
    - 1) Identify features with a high ratio of missing values (above 'mv_threshold').
372
    - 2) Identify high correlations of these features among themselves and with \
373
        other features in the dataset (above 'corr_thresh_features').
374
    - 3) Features with high ratio of missing values and high correlation among each \
375
        other are dropped unless they correlate reasonably well with the target \
376
        variable (above 'corr_thresh_target').
377
378
    Note: If no target is provided, the process exits after step two and drops columns \
379
    identified up to this point.
380
381
    Parameters
382
    ----------
383
    data : pd.DataFrame
384
        2D dataset that can be coerced into Pandas DataFrame
385
    target : Optional[Union[str, pd.Series, List]], optional
386
        Specify target for correlation. I.e. label column to generate only the \
387
        correlations between each feature and the label, by default None
388
    mv_threshold : float, optional
389
        Value between 0 <= threshold <= 1. Features with a missing-value-ratio larger \
390
        than mv_threshold are candidates for dropping and undergo further analysis, by \
391
        default 0.1
392
    corr_thresh_features : float, optional
393
        Value between 0 <= threshold <= 1. Maximum correlation a previously identified \
394
        features (with a high mv-ratio) is allowed to have with another feature. If \
395
        this threshold is overstepped, the feature undergoes further analysis, by \
396
        default 0.5
397
    corr_thresh_target : float, optional
398
        Value between 0 <= threshold <= 1. Minimum required correlation of a remaining \
399
        feature (i.e. feature with a high mv-ratio and high correlation to another \
400
        existing feature) with the target. If this threshold is not met the feature is \
401
        ultimately dropped, by default 0.3
402
    return_details : bool, optional
403
        Provdies flexibility to return intermediary results, by default False
404
405
    Returns
406
    -------
407
    pd.DataFrame
408
        Updated Pandas DataFrame
409
410
    optional:
411
    cols_mv: Columns with missing values included in the analysis
412
    drop_cols: List of dropped columns
413
    """
414
    # Validate Inputs
415
    _validate_input_range(mv_threshold, "mv_threshold", 0, 1)
416
    _validate_input_range(corr_thresh_features, "corr_thresh_features", 0, 1)
417
    _validate_input_range(corr_thresh_target, "corr_thresh_target", 0, 1)
418
419
    data = pd.DataFrame(data).copy()
420
    data_local = data.copy()
421
    mv_ratios = _missing_vals(data_local)["mv_cols_ratio"]
422
    cols_mv = mv_ratios[mv_ratios > mv_threshold].index.tolist()
423
    data_local[cols_mv] = (
424
        data_local[cols_mv].applymap(lambda x: 1 if not pd.isnull(x) else x).fillna(0)
425
    )
426
427
    high_corr_features = []
428
    data_temp = data_local.copy()
429
    for col in cols_mv:
430
        corrmat = corr_mat(data_temp, colored=False)
431
        if abs(corrmat[col]).nlargest(2)[1] > corr_thresh_features:
432
            high_corr_features.append(col)
433
            data_temp = data_temp.drop(columns=[col])
434
435
    drop_cols = []
436
    if target is None:
437
        data = data.drop(columns=high_corr_features)
438
    else:
439
        corrs = corr_mat(data_local, target=target, colored=False).loc[
440
            high_corr_features
441
        ]
442
        drop_cols = corrs.loc[abs(corrs.iloc[:, 0]) < corr_thresh_target].index.tolist()
443
        data = data.drop(columns=drop_cols)
444
445
    if return_details:
446
        return data, cols_mv, drop_cols
447
448
    return data
449
450
451
def pool_duplicate_subsets(
452
    data: pd.DataFrame,
453
    col_dupl_thresh: float = 0.2,
454
    subset_thresh: float = 0.2,
455
    min_col_pool: int = 3,
456
    exclude: Optional[List[str]] = None,
457
    return_details=False,
458
) -> pd.DataFrame:
459
    """Check for duplicates in subsets of columns and pools them. This can reduce \
460
        the number of columns in the data without loosing much information. Suitable \
461
        columns are combined to subsets and tested for duplicates. In case sufficient \
462
        duplicates can be found, the respective columns are aggregated into a \
463
        "pooled_var" column. Identical numbers in the "pooled_var" column indicate \
464
        identical information in the respective rows.
465
466
        Note:  It is advised to exclude features that provide sufficient informational \
467
        content by themselves as well as the target column by using the "exclude" \
468
        setting.
469
470
    Parameters
471
    ----------
472
    data : pd.DataFrame
473
        2D dataset that can be coerced into Pandas DataFrame
474
    col_dupl_thresh : float, optional
475
        Columns with a ratio of duplicates higher than "col_dupl_thresh" are \
476
        considered in the further analysis. Columns with a lower ratio are not \
477
        considered for pooling, by default 0.2
478
    subset_thresh : float, optional
479
        The first subset with a duplicate threshold higher than "subset_thresh" is \
480
        chosen and aggregated. If no subset reaches the threshold, the algorithm \
481
        continues with continuously smaller subsets until "min_col_pool" is reached, \
482
        by default 0.2
483
    min_col_pool : int, optional
484
        Minimum number of columns to pool. The algorithm attempts to combine as many \
485
        columns as possible to suitable subsets and stops when "min_col_pool" is \
486
        reached, by default 3
487
    exclude : Optional[List[str]], optional
488
        List of column names to be excluded from the analysis. These columns are \
489
        passed through without modification, by default None
490
    return_details : bool, optional
491
        Provdies flexibility to return intermediary results, by default False
492
493
    Returns
494
    -------
495
    pd.DataFrame
496
        DataFrame with low cardinality columns pooled
497
498
    optional:
499
    subset_cols: List of columns used as subset
500
    """
501
    # Input validation
502
    _validate_input_range(col_dupl_thresh, "col_dupl_thresh", 0, 1)
503
    _validate_input_range(subset_thresh, "subset_thresh", 0, 1)
504
    _validate_input_range(min_col_pool, "min_col_pool", 0, data.shape[1])
505
506
    excluded_cols = []
507
    if exclude is not None:
508
        excluded_cols = data[exclude]
509
        data = data.drop(columns=exclude)
510
511
    subset_cols = []
512
    for i in range(data.shape[1] + 1 - min_col_pool):
513
        # Consider only columns with lots of duplicates
514
        check_list = [
515
            col
516
            for col in data.columns
517
            if data.duplicated(subset=col).mean() > col_dupl_thresh
518
        ]
519
520
        # Identify all possible combinations for the current interation
521
        if check_list:
522
            combinations = itertools.combinations(check_list, len(check_list) - i)
523
        else:
524
            continue
525
526
        # Check subsets for all possible combinations
527
        ratios = [
528
            *map(lambda comb: data.duplicated(subset=list(comb)).mean(), combinations)
529
        ]
530
        max_idx = np.argmax(ratios)
531
532
        if max(ratios) > subset_thresh:
533
            # Get the best possible iterator and process the data
534
            best_subset = itertools.islice(
535
                itertools.combinations(check_list, len(check_list) - i),
536
                max_idx,
537
                max_idx + 1,
538
            )
539
540
            best_subset = data[list(list(best_subset)[0])]
541
            subset_cols = best_subset.columns.tolist()
542
543
            unique_subset = (
544
                best_subset.drop_duplicates()
545
                .reset_index()
546
                .rename(columns={"index": "pooled_vars"})
547
            )
548
            data = data.merge(unique_subset, how="left", on=subset_cols).drop(
549
                columns=subset_cols
550
            )
551
            data.index = pd.RangeIndex(len(data))
552
            break
553
554
    data = pd.concat([data, pd.DataFrame(excluded_cols)], axis=1)
555
556
    if return_details:
557
        return data, subset_cols
558
559
    return data
560