benedict.serializers.csv.CSVSerializer.decode()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 21
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 19
nop 3
dl 0
loc 21
rs 8.5166
c 0
b 0
f 0
1
import csv
2
from io import StringIO
3
4
from benedict.serializers.abstract import AbstractSerializer
5
from benedict.utils import type_util
6
7
8
class CSVSerializer(AbstractSerializer):
9
    """
10
    This class describes a csv serializer.
11
    """
12
13
    def __init__(self):
14
        super().__init__(
15
            extensions=[
16
                "csv",
17
            ],
18
        )
19
20
    def decode(self, s, **kwargs):
21
        # kwargs.setdefault('delimiter', ',')
22
        if kwargs.pop("quote", False):
23
            # TODO: add tests coverage
24
            kwargs.setdefault("quoting", csv.QUOTE_ALL)
25
        columns = kwargs.pop("columns", None)
26
        columns_row = kwargs.pop("columns_row", True)
27
        f = StringIO(s)
28
        r = csv.reader(f, **kwargs)
29
        ln = 0
30
        data = []
31
        for row in r:
32
            if ln == 0 and columns_row:
33
                if not columns:
34
                    columns = row
35
                ln += 1
36
                continue
37
            d = dict(zip(columns, row))
38
            data.append(d)
39
            ln += 1
40
        return data
41
42
    def encode(self, d, **kwargs):
43
        ls = d
44
        # kwargs.setdefault('delimiter', ',')
45
        if kwargs.pop("quote", False):
46
            kwargs.setdefault("quoting", csv.QUOTE_ALL)
47
        kwargs.setdefault("lineterminator", "\n")
48
        columns = kwargs.pop("columns", None)
49
        columns_row = kwargs.pop("columns_row", True)
50
        if not columns and len(ls) and type_util.is_dict(ls[0]):
51
            keys = [str(key) for key in ls[0].keys()]
52
            columns = list(sorted(keys))
53
        f = StringIO()
54
        w = csv.writer(f, **kwargs)
55
        if columns_row and columns:
56
            w.writerow(columns)
57
        for item in ls:
58
            if type_util.is_dict(item):
59
                row = [item.get(key, "") for key in columns]
60
            elif type_util.is_collection(item):
61
                # TODO: add tests coverage
62
                row = item
63
            else:
64
                # TODO: add tests coverage
65
                row = [item]
66
            w.writerow(row)
67
        data = f.getvalue()
68
        return data
69