Passed
Push — master ( d636b2...ee6986 )
by Fabio
06:00
created

benedict.serializers.xls.XLSSerializer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
3
from benedict.serializers.abstract import AbstractSerializer
4
from openpyxl import load_workbook
5
from slugify import slugify
6
from xlrd import open_workbook
7
import fsutil
8
9
10
class XLSSerializer(AbstractSerializer):
11
    """
12
    This class describes a xls serializer.
13
    """
14
15
    def __init__(self):
16
        super(XLSSerializer, self).__init__(
17
            extensions=[
18
                "xls",
19
                "xlsx",
20
                "xlsm",
21
            ],
22
        )
23
24
    def _get_sheet_index_and_name(self, **kwargs):
25
        sheet_index_or_name = kwargs.pop("sheet", 0)
26
        sheet_index = 0
27
        sheet_name = ""
28
        if isinstance(sheet_index_or_name, int):
29
            sheet_index = sheet_index_or_name
30
        elif isinstance(sheet_index_or_name, str):
31
            sheet_name = sheet_index_or_name
32
        return (sheet_index, sheet_name)
33
34
    def _decode_legacy(self, s, **kwargs):
35
        filepath = s
36
37
        # load the worksheet
38
        workbook = open_workbook(filename=filepath)
39
40
        # get sheet by index or by name
41
        sheet_index, sheet_name = self._get_sheet_index_and_name(**kwargs)
42
        if sheet_name:
43
            sheet_names = workbook.sheet_names()
44
            try:
45
                sheet_index = sheet_names.index(slugify(sheet_name))
46
            except ValueError:
47
                raise Exception(f"Invalid sheet name '{sheet_name}', sheet not found.")
48
        sheet = workbook.sheet_by_index(sheet_index)
49
        sheet_columns_range = range(sheet.ncols)
50
51
        # get columns
52
        columns = kwargs.pop("columns", None)
53
        columns_row = kwargs.pop("columns_row", True)
54
        columns_standardized = kwargs.pop("columns_standardized", columns is None)
55
        if not columns:
56
            if columns_row:
57
                # if first row is for column names read the names
58
                # for row in sheet.iter_rows(min_row=1, max_row=1):
59
                columns = [
60
                    sheet.cell_value(0, col_index) for col_index in sheet_columns_range
61
                ]
62
            else:
63
                # otherwise use columns indexes as column names
64
                # for row in sheet.iter_rows(min_row=1, max_row=1):
65
                columns = [col_index for col_index in sheet_columns_range]
66
67
        # standardize column names, eg. "Date Created" -> "date_created"
68
        if columns_standardized:
69
            columns = [slugify(column, separator="_") for column in columns]
70
71
        # build list of dicts, one for each row
72
        items = []
73
        items_row_start = 1 if columns_row else 0
74
        for row_index in range(items_row_start, sheet.nrows):
75
            row = {}
76
            for col_index in sheet_columns_range:
77
                col_key = columns[col_index]
78
                value = sheet.cell_value(row_index, col_index)
79
                row[col_key] = value
80
            items.append(row)
81
82
        # print(items)
83
        return items
84
85
    def _decode(self, s, **kwargs):
86
        filepath = s
87
88
        # load the worksheet
89
        workbook = load_workbook(filename=filepath, read_only=True)
90
91
        # select sheet by index or by name
92
        sheet_index, sheet_name = self._get_sheet_index_and_name(**kwargs)
93
        sheets = [sheet for sheet in workbook]
94
        if sheet_name:
95
            sheet_names = [slugify(sheet.title) for sheet in sheets]
96
            try:
97
                sheet_index = sheet_names.index(slugify(sheet_name))
98
            except ValueError:
99
                raise Exception(f"Invalid sheet name '{sheet_name}', sheet not found.")
100
        sheet = sheets[sheet_index]
101
        sheet_columns_cells = list(sheet.iter_rows(min_row=1, max_row=1))[0]
102
103
        # get columns
104
        columns = kwargs.pop("columns", None)
105
        columns_row = kwargs.pop("columns_row", True)
106
        columns_standardized = kwargs.pop("columns_standardized", columns is None)
107
        if not columns:
108
            if columns_row:
109
                # if first row is for column names read the names
110
                # for row in sheet.iter_rows(min_row=1, max_row=1):
111
                columns = [cell.value for cell in sheet_columns_cells]
112
            else:
113
                # otherwise use columns indexes as column names
114
                # for row in sheet.iter_rows(min_row=1, max_row=1):
115
                columns = [index for index in range(len(sheet_columns_cells))]
116
117
        # standardize column names, eg. "Date Created" -> "date_created"
118
        if columns_standardized:
119
            columns = [slugify(column, separator="_") for column in columns]
120
121
        # build list of dicts, one for each row
122
        items = []
123
        items_row_start = 2 if columns_row else 1
124
        for row in sheet.iter_rows(min_row=items_row_start):
125
            values = list([cell.value for cell in row])
126
            items.append(dict(zip(columns, values)))
127
128
        # close the worksheet
129
        workbook.close()
130
131
        # print(items)
132
        return items
133
134
    def decode(self, s, **kwargs):
135
        extension = fsutil.get_file_extension(s)
136
        if extension in ["xlsx", "xlsm"]:
137
            return self._decode(s, **kwargs)
138
        elif extension in ["xls", "xlt"]:
139
            return self._decode_legacy(s, **kwargs)
140
141
    def encode(self, d, **kwargs):
142
        raise NotImplementedError
143