| Total Complexity | 6 |
| Total Lines | 30 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | import attr |
||
| 2 | from so_magic.utils import notification as n |
||
| 3 | |||
| 4 | |||
| 5 | @attr.s |
||
| 6 | class DatapointsManager(n.Observer): |
||
| 7 | datapoints_objects = attr.ib(init=True, default={}) |
||
| 8 | _last_key = attr.ib(init=False, default='') |
||
| 9 | |||
| 10 | def update(self, subject: n.Subject): |
||
| 11 | datapoints_object = subject.state |
||
| 12 | key = getattr(subject, 'name', '') |
||
| 13 | if key == '': |
||
| 14 | raise RuntimeError(f"Subject {subject} with state {str(subject.state)} resulted in an empty string as key (to use in dict/hash).") |
||
| 15 | if key in self.datapoints_objects: |
||
| 16 | raise RuntimeError(f"Attempted to register a new Datapoints object at the existing key '{key}'.") |
||
| 17 | self.datapoints_objects[key] = datapoints_object |
||
| 18 | self._last_key = key |
||
| 19 | |||
| 20 | @property |
||
| 21 | def state(self): |
||
| 22 | return self._last_key |
||
| 23 | |||
| 24 | @property |
||
| 25 | def datapoints(self): |
||
| 26 | try: |
||
| 27 | return self.datapoints_objects[self._last_key] |
||
| 28 | except KeyError as e: |
||
| 29 | print(f"{e}. Requested datapoints with id '{self._last_key}', but was not found in registered [{', '.join(_ for _ in self.datapoints_objects.keys())}]") |
||
| 30 |