Passed
Push — master ( cfc911...b504e0 )
by Fabio
05:06
created

XLSSerializer._get_sheet_columns_indexes()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
3
import fsutil
4
from openpyxl import load_workbook
5
from slugify import slugify
6
from xlrd import open_workbook
7
8
from benedict.serializers.abstract import AbstractSerializer
9
10
11
class XLSSerializer(AbstractSerializer):
12
    """
13
    This class describes a xls serializer.
14
    """
15
16
    def __init__(self):
17
        super(XLSSerializer, self).__init__(
18
            extensions=[
19
                "xls",
20
                "xlsx",
21
                "xlsm",
22
            ],
23
        )
24
25
    def _get_sheet_index_and_name_from_options(self, **kwargs):
26
        sheet_index_or_name = kwargs.pop("sheet", 0)
27
        sheet_index = 0
28
        sheet_name = ""
29
        if isinstance(sheet_index_or_name, int):
30
            sheet_index = sheet_index_or_name
31
        elif isinstance(sheet_index_or_name, str):
32
            sheet_name = sheet_index_or_name
33
        return (sheet_index, sheet_name)
34
35
    def _get_sheet_index_by_name(self, sheet_name, sheet_names):
36
        sheet_names = list([slugify(name) for name in sheet_names])
37
        try:
38
            sheet_index = sheet_names.index(slugify(sheet_name))
39
            return sheet_index
40
        except ValueError:
41
            raise Exception(f"Invalid sheet name '{sheet_name}', sheet not found.")
42
43
    def _get_sheet_columns_indexes(self, columns_count):
44
        return [column_index for column_index in range(columns_count)]
45
46
    def _decode_legacy(self, s, **kwargs):
47
        filepath = s
48
49
        # load the worksheet
50
        workbook = open_workbook(filename=filepath)
51
52
        # get sheet by index or by name
53
        sheet_index, sheet_name = self._get_sheet_index_and_name_from_options(**kwargs)
54
        if sheet_name:
55
            sheet_names = workbook.sheet_names()
56
            sheet_index = self._get_sheet_index_by_name(sheet_name, sheet_names)
57
        sheet = workbook.sheet_by_index(sheet_index)
58
        sheet_columns_range = range(sheet.ncols)
59
60
        # get columns
61
        columns = kwargs.pop("columns", None)
62
        columns_row = kwargs.pop("columns_row", True)
63
        columns_standardized = kwargs.pop("columns_standardized", columns is None)
64
        if not columns:
65
            if columns_row:
66
                # if first row is for column names read the names
67
                # for row in sheet.iter_rows(min_row=1, max_row=1):
68
                columns = [
69
                    sheet.cell_value(0, col_index) for col_index in sheet_columns_range
70
                ]
71
            else:
72
                # otherwise use columns indexes as column names
73
                # for row in sheet.iter_rows(min_row=1, max_row=1):
74
                columns = self._get_sheet_columns_indexes(sheet_columns_range)
75
76
        # standardize column names, eg. "Date Created" -> "date_created"
77
        if columns_standardized:
78
            columns = [slugify(column, separator="_") for column in columns]
79
80
        # build list of dicts, one for each row
81
        items = []
82
        items_row_start = 1 if columns_row else 0
83
        for row_index in range(items_row_start, sheet.nrows):
84
            row = {}
85
            for col_index in sheet_columns_range:
86
                col_key = columns[col_index]
87
                value = sheet.cell_value(row_index, col_index)
88
                row[col_key] = value
89
            items.append(row)
90
91
        # print(items)
92
        return items
93
94
    def _decode(self, s, **kwargs):
95
        filepath = s
96
97
        # load the worksheet
98
        workbook = load_workbook(filename=filepath, read_only=True)
99
100
        # get sheet by index or by name
101
        sheet_index, sheet_name = self._get_sheet_index_and_name_from_options(**kwargs)
102
        sheets = [sheet for sheet in workbook]
103
        if sheet_name:
104
            sheet_names = [sheet.title for sheet in sheets]
105
            sheet_index = self._get_sheet_index_by_name(sheet_name, sheet_names)
106
        sheet = sheets[sheet_index]
107
        sheet_columns_cells = list(sheet.iter_rows(min_row=1, max_row=1))[0]
108
109
        # get columns
110
        columns = kwargs.pop("columns", None)
111
        columns_row = kwargs.pop("columns_row", True)
112
        columns_standardized = kwargs.pop("columns_standardized", columns is None)
113
        if not columns:
114
            if columns_row:
115
                # if first row is for column names read the names
116
                # for row in sheet.iter_rows(min_row=1, max_row=1):
117
                columns = [cell.value for cell in sheet_columns_cells]
118
            else:
119
                # otherwise use columns indexes as column names
120
                # for row in sheet.iter_rows(min_row=1, max_row=1):
121
                columns = self._get_sheet_columns_indexes(len(sheet_columns_cells))
122
123
        # standardize column names, eg. "Date Created" -> "date_created"
124
        if columns_standardized:
125
            columns = [slugify(column, separator="_") for column in columns]
126
127
        # build list of dicts, one for each row
128
        items = []
129
        items_row_start = 2 if columns_row else 1
130
        for row in sheet.iter_rows(min_row=items_row_start):
131
            values = list([cell.value for cell in row])
132
            items.append(dict(zip(columns, values)))
133
134
        # close the worksheet
135
        workbook.close()
136
137
        # print(items)
138
        return items
139
140
    def decode(self, s, **kwargs):
141
        extension = fsutil.get_file_extension(s)
142
        if extension in ["xlsx", "xlsm"]:
143
            return self._decode(s, **kwargs)
144
        elif extension in ["xls", "xlt"]:
145
            return self._decode_legacy(s, **kwargs)
146
147
    def encode(self, d, **kwargs):
148
        raise NotImplementedError
149