XLSSerializer._get_sheet_index_by_name()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 3
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
import fsutil
2
from openpyxl import load_workbook
3
from slugify import slugify
4
from xlrd import open_workbook
5
6
from benedict.serializers.abstract import AbstractSerializer
7
8
9
class XLSSerializer(AbstractSerializer):
10
    """
11
    This class describes a xls serializer.
12
    """
13
14
    def __init__(self):
15
        super().__init__(
16
            extensions=[
17
                "xls",
18
                "xlsx",
19
                "xlsm",
20
            ],
21
        )
22
23
    def _get_sheet_index_and_name_from_options(self, **kwargs):
24
        sheet_index_or_name = kwargs.pop("sheet", 0)
25
        sheet_index = 0
26
        sheet_name = ""
27
        if isinstance(sheet_index_or_name, int):
28
            sheet_index = sheet_index_or_name
29
        elif isinstance(sheet_index_or_name, str):
30
            sheet_name = sheet_index_or_name
31
        return (sheet_index, sheet_name)
32
33
    def _get_sheet_index_by_name(self, sheet_name, sheet_names):
34
        sheet_names = list([slugify(name) for name in sheet_names])
35
        try:
36
            sheet_index = sheet_names.index(slugify(sheet_name))
37
            return sheet_index
38
        except ValueError:
39
            raise Exception(f"Invalid sheet name '{sheet_name}', sheet not found.")
40
41
    def _get_sheet_columns_indexes(self, columns_count):
42
        return [column_index for column_index in range(columns_count)]
43
44
    def _decode_legacy(self, s, **kwargs):
45
        filepath = s
46
47
        # load the worksheet
48
        workbook = open_workbook(filename=filepath)
49
50
        # get sheet by index or by name
51
        sheet_index, sheet_name = self._get_sheet_index_and_name_from_options(**kwargs)
52
        if sheet_name:
53
            sheet_names = workbook.sheet_names()
54
            sheet_index = self._get_sheet_index_by_name(sheet_name, sheet_names)
55
        sheet = workbook.sheet_by_index(sheet_index)
56
        sheet_columns_range = range(sheet.ncols)
57
58
        # get columns
59
        columns = kwargs.pop("columns", None)
60
        columns_row = kwargs.pop("columns_row", True)
61
        columns_standardized = kwargs.pop("columns_standardized", columns is None)
62
        if not columns:
63
            if columns_row:
64
                # if first row is for column names read the names
65
                # for row in sheet.iter_rows(min_row=1, max_row=1):
66
                columns = [
67
                    sheet.cell_value(0, col_index) for col_index in sheet_columns_range
68
                ]
69
            else:
70
                # otherwise use columns indexes as column names
71
                # for row in sheet.iter_rows(min_row=1, max_row=1):
72
                columns = self._get_sheet_columns_indexes(sheet_columns_range)
73
74
        # standardize column names, eg. "Date Created" -> "date_created"
75
        if columns_standardized:
76
            columns = [slugify(column, separator="_") for column in columns]
77
78
        # build list of dicts, one for each row
79
        items = []
80
        items_row_start = 1 if columns_row else 0
81
        for row_index in range(items_row_start, sheet.nrows):
82
            row = {}
83
            for col_index in sheet_columns_range:
84
                col_key = columns[col_index]
85
                value = sheet.cell_value(row_index, col_index)
86
                row[col_key] = value
87
            items.append(row)
88
89
        # print(items)
90
        return items
91
92
    def _decode(self, s, **kwargs):
93
        filepath = s
94
95
        # load the worksheet
96
        workbook = load_workbook(filename=filepath, read_only=True)
97
98
        # get sheet by index or by name
99
        sheet_index, sheet_name = self._get_sheet_index_and_name_from_options(**kwargs)
100
        sheets = [sheet for sheet in workbook]
101
        if sheet_name:
102
            sheet_names = [sheet.title for sheet in sheets]
103
            sheet_index = self._get_sheet_index_by_name(sheet_name, sheet_names)
104
        sheet = sheets[sheet_index]
105
        sheet_columns_cells = list(sheet.iter_rows(min_row=1, max_row=1))[0]
106
107
        # get columns
108
        columns = kwargs.pop("columns", None)
109
        columns_row = kwargs.pop("columns_row", True)
110
        columns_standardized = kwargs.pop("columns_standardized", columns is None)
111
        if not columns:
112
            if columns_row:
113
                # if first row is for column names read the names
114
                # for row in sheet.iter_rows(min_row=1, max_row=1):
115
                columns = [cell.value for cell in sheet_columns_cells]
116
            else:
117
                # otherwise use columns indexes as column names
118
                # for row in sheet.iter_rows(min_row=1, max_row=1):
119
                columns = self._get_sheet_columns_indexes(len(sheet_columns_cells))
120
121
        # standardize column names, eg. "Date Created" -> "date_created"
122
        if columns_standardized:
123
            columns = [slugify(column, separator="_") for column in columns]
124
125
        # build list of dicts, one for each row
126
        items = []
127
        items_row_start = 2 if columns_row else 1
128
        for row in sheet.iter_rows(min_row=items_row_start):
129
            values = list([cell.value for cell in row])
130
            items.append(dict(zip(columns, values)))
131
132
        # close the worksheet
133
        workbook.close()
134
135
        # print(items)
136
        return items
137
138
    def decode(self, s, **kwargs):
139
        extension = fsutil.get_file_extension(s)
140
        if extension in ["xlsx", "xlsm"]:
141
            return self._decode(s, **kwargs)
142
        elif extension in ["xls", "xlt"]:
143
            return self._decode_legacy(s, **kwargs)
144
145
    def encode(self, d, **kwargs):
146
        raise NotImplementedError
147