Passed
Push — datapoints-package ( a11eff )
by Konstantinos
02:48
created

so_magic.data.datapoints.datapoints_manager   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 30
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 30
rs 10
c 0
b 0
f 0
wmc 6

3 Methods

Rating   Name   Duplication   Size   Complexity  
A DatapointsManager.update() 0 9 3
A DatapointsManager.datapoints() 0 6 2
A DatapointsManager.state() 0 3 1
1
import attr
2
from so_magic.utils import notification as n
3
4
5
@attr.s
6
class DatapointsManager(n.Observer):
7
    datapoints_objects = attr.ib(init=True, default={})
8
    _last_key = attr.ib(init=False, default='')
9
10
    def update(self, subject: n.Subject):
11
        datapoints_object = subject.state
12
        key = getattr(subject, 'name', '')
13
        if key == '':
14
            raise RuntimeError(f"Subject {subject} with state {str(subject.state)} resulted in an empty string as key (to use in dict/hash).")
15
        if key in self.datapoints_objects:
16
            raise RuntimeError(f"Attempted to register a new Datapoints object at the existing key '{key}'.")
17
        self.datapoints_objects[key] = datapoints_object
18
        self._last_key = key
19
20
    @property
21
    def state(self):
22
        return self._last_key
23
24
    @property
25
    def datapoints(self):
26
        try:
27
            return self.datapoints_objects[self._last_key]
28
        except KeyError as e:
29
            print(f"{e}. Requested datapoints with id '{self._last_key}', but was not found in registered [{', '.join(_ for _ in self.datapoints_objects.keys())}]")
30