1
|
|
|
"""This module is responsible to provide means of creating (instantiating) objects |
2
|
|
|
representing Datapoints collections.""" |
3
|
|
|
import logging |
4
|
|
|
from typing import Iterable |
5
|
|
|
import attr |
6
|
|
|
from so_magic.utils import Subject |
7
|
|
|
from .datapoints import DatapointsFactory |
8
|
|
|
|
9
|
|
|
logger = logging.getLogger(__name__) |
10
|
|
|
|
11
|
|
|
|
12
|
|
|
# TODO refactor to a composition (not inheritance) implementation of the below class |
13
|
|
|
# reason is that it will be more evident that this class simply combines the |
14
|
|
|
# features of 2 other classes (Subject & DatapointsFactory) to get the job done |
15
|
|
|
@attr.s |
16
|
|
|
class BroadcastingDatapointsFactory(DatapointsFactory): |
17
|
|
|
"""Creates Datapoints objects and informs its subscribers when that happens. |
18
|
|
|
|
19
|
|
|
A factory class that informs its subscribers when a new object that |
20
|
|
|
implements the DatapointsInterface is created (following a request). |
21
|
|
|
|
22
|
|
|
Args: |
23
|
|
|
subject (Subject, optional): the subject of observation; the "thing" that others |
24
|
|
|
listen to |
25
|
|
|
""" |
26
|
|
|
subject: Subject = attr.ib(init=True, default=attr.Factory(Subject)) |
27
|
|
|
name: str = attr.ib(init=False, default='') |
28
|
|
|
# TODO check if above can be removed, along with self.name = getattr(args[0], 'name', '') in 'create' method |
29
|
|
|
|
30
|
|
|
def create(self, datapoints_factory_type: str, *args, **kwargs) -> Iterable: |
31
|
|
|
"""Create new Datapoints and inform subscribers. |
32
|
|
|
|
33
|
|
|
The factory method that returns a new object of DatapointsInterface, by |
34
|
|
|
looking at the registered constructors to delegate the object creation. |
35
|
|
|
|
36
|
|
|
Args: |
37
|
|
|
datapoints_factory_type (str): the name of the "constructor" to use |
38
|
|
|
|
39
|
|
|
Raises: |
40
|
|
|
RuntimeError: [description] |
41
|
|
|
|
42
|
|
|
Returns: |
43
|
|
|
Iterable: instance implementing the DatapointsInterface |
44
|
|
|
""" |
45
|
|
|
self.subject.name = kwargs.pop('id', kwargs.pop('name', kwargs.pop('file_path', ''))) |
46
|
|
|
if kwargs: |
47
|
|
|
msg = f"Kwargs: [{', '.join(f'{k}: {v}' for k, v in kwargs.items())}]" |
48
|
|
|
raise RuntimeError("The 'create' method of DatapointsFactory does not support kwargs:", msg) |
49
|
|
|
self.subject.state = super().create(datapoints_factory_type, *args, **kwargs) |
50
|
|
|
# logger.debug(f"Created datapoints: {json.dumps({ |
51
|
|
|
# 'datapoints': self.subject.state, |
52
|
|
|
# 'name': self.subject.name, |
53
|
|
|
# })}") |
54
|
|
|
if args and not hasattr(self, '.name'): |
55
|
|
|
self.name = getattr(args[0], 'name', '') |
56
|
|
|
self.subject.notify() |
57
|
|
|
return self.subject.state |
58
|
|
|
|