etlt.writer.Writer   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 57
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 57
ccs 0
cts 15
cp 0
rs 10
c 0
b 0
f 0
wmc 6

5 Methods

Rating   Name   Duplication   Size   Complexity  
A Writer.__exit__() 0 2 1
A Writer.__enter__() 0 2 1
A Writer.fields() 0 6 1
A Writer.writerow() 0 8 1
A Writer.__init__() 0 7 1
1
import abc
2
import copy
3
from typing import Any, Dict, List
4
5
6
class Writer(metaclass=abc.ABCMeta):
7
    """
8
    Abstract parent class for writing rows to a destination.
9
    """
10
11
    # ------------------------------------------------------------------------------------------------------------------
12
    def __init__(self):
13
        """
14
        Object constructor.
15
        """
16
17
        self._fields: List[str] = []
18
        """
19
        The fields (columns) that must be written to the destination.
20
        """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    @property
24
    def fields(self) -> List[str]:
25
        """
26
        Getter for fields.
27
        """
28
        return copy.copy(self._fields)
29
30
    # ------------------------------------------------------------------------------------------------------------------
31
    @fields.setter
32
    def fields(self, fields: List[str]) -> None:
33
        """
34
        Setter for fields.
35
36
        :param fields: The fields (or columns) that must be written to the destination.
37
        """
38
        self._fields = fields
39
40
    # ------------------------------------------------------------------------------------------------------------------
41
    @abc.abstractmethod
42
    def writerow(self, row: Dict[str, Any]) -> None:
43
        """
44
        Writes a row to the destination.
45
46
        :param row: The row.
47
        """
48
        raise NotImplementedError()
49
50
    # ------------------------------------------------------------------------------------------------------------------
51
    def __enter__(self):
52
        raise NotImplementedError()
53
54
    # ------------------------------------------------------------------------------------------------------------------
55
    def __exit__(self, exc_type, exc_value, traceback):
56
        raise NotImplementedError()
57
58
# ----------------------------------------------------------------------------------------------------------------------
59