1
|
|
|
"""Defines the DatapointsManager type (class); a centralized facility where all |
2
|
|
|
datapoints objects should arrived and be retrieved from.""" |
3
|
|
|
from typing import Iterable, Optional |
4
|
|
|
import logging |
5
|
|
|
import attr |
6
|
|
|
from so_magic.utils import Observer, Subject |
7
|
|
|
|
8
|
|
|
logger = logging.getLogger(__name__) |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
@attr.s |
12
|
|
|
class DatapointsManager(Observer): |
13
|
|
|
"""Manage operations revolved around datapoints collection objects. |
14
|
|
|
|
15
|
|
|
Instances of this class are able to monitor (listener/observer pattern) the creation of |
16
|
|
|
datapoints collection objects and store them in a dictionary structure. |
17
|
|
|
They also provide retrieval methods to the client to "pick up" a datapoints object. |
18
|
|
|
|
19
|
|
|
Args: |
20
|
|
|
datapoints_objects (dict, optional): the initial structure that stores datapoints objects |
21
|
|
|
""" |
22
|
|
|
datapoints_objects = attr.ib(init=True, default={}) |
23
|
|
|
_last_key = attr.ib(init=False, default='') |
24
|
|
|
|
25
|
|
|
def update(self, subject: Subject): |
26
|
|
|
"""Update our state based on the event/observation captured/made. |
27
|
|
|
|
28
|
|
|
Stores the datapoints object observed in a dictionary using a the Subject |
29
|
|
|
name attribute as key. |
30
|
|
|
|
31
|
|
|
Args: |
32
|
|
|
subject (Subject): the subject object observed; it acts as an event |
33
|
|
|
|
34
|
|
|
Raises: |
35
|
|
|
RuntimeError: in case there is no 'name' attribute on the subject or if it is an empty string '' |
36
|
|
|
RuntimeError: in case the 'name' attribute on the subject has already been used to store a datapoints object |
37
|
|
|
""" |
38
|
|
|
datapoints_object = subject.state |
39
|
|
|
key = getattr(subject, 'name', '') |
40
|
|
|
if key == '': |
41
|
|
|
raise RuntimeError(f'Subject {subject} with state {str(subject.state)} resulted in an empty string as key.' |
42
|
|
|
f'We reject the key, since it is going to "query" a in dict/hash).') |
43
|
|
|
if key in self.datapoints_objects: |
44
|
|
|
raise RuntimeError(f"Attempted to register a new Datapoints object at the existing key '{key}'.") |
45
|
|
|
self.datapoints_objects[key] = datapoints_object |
46
|
|
|
self._last_key = key |
47
|
|
|
|
48
|
|
|
@property |
49
|
|
|
def state(self): |
50
|
|
|
"""The latest (most recent) key used to store a datapoints object. |
51
|
|
|
|
52
|
|
|
Returns: |
53
|
|
|
str: the key under which we stored a datapoints object last time |
54
|
|
|
""" |
55
|
|
|
return self._last_key |
56
|
|
|
|
57
|
|
|
@property |
58
|
|
|
def datapoints(self) -> Optional[Iterable]: # indicates that the method can return an Iterable or None types |
59
|
|
|
"""The most recently stored datapoints object. |
60
|
|
|
|
61
|
|
|
Returns: |
62
|
|
|
Optional[Iterable]: the reference to the datapoints object |
63
|
|
|
""" |
64
|
|
|
try: |
65
|
|
|
return self.datapoints_objects[self._last_key] |
66
|
|
|
except KeyError as exception: |
67
|
|
|
logger.error("%s . Requested datapoints with id '%s', but was not found in registered [%s]", |
68
|
|
|
exception, self._last_key, {', '.join(_ for _ in self.datapoints_objects.keys())}) |
69
|
|
|
|