so_magic.data.magic_datapoints_factory   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 58
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 58
rs 10
c 0
b 0
f 0
wmc 4

1 Method

Rating   Name   Duplication   Size   Complexity  
A BroadcastingDatapointsFactory.create() 0 28 4
1
"""This module is responsible to provide means of creating (instantiating) objects
2
representing Datapoints collections."""
3
import logging
4
from typing import Iterable
5
import attr
6
from so_magic.utils import Subject
7
from .datapoints import DatapointsFactory
8
9
logger = logging.getLogger(__name__)
10
11
12
# TODO refactor to a composition (not inheritance) implementation of the below class
13
# reason is that it will be more evident that this class simply combines the
14
# features of 2 other classes (Subject & DatapointsFactory) to get the job done
15
@attr.s
16
class BroadcastingDatapointsFactory(DatapointsFactory):
17
    """Creates Datapoints objects and informs its subscribers when that happens.
18
19
    A factory class that informs its subscribers when a new object that
20
    implements the DatapointsInterface is created (following a request).
21
22
    Args:
23
        subject (Subject, optional): the subject of observation; the "thing" that others
24
                          listen to
25
    """
26
    subject: Subject = attr.ib(init=True, default=attr.Factory(Subject))
27
    name: str = attr.ib(init=False, default='')
28
    # TODO check if above can be removed, along with self.name = getattr(args[0], 'name', '') in 'create' method
29
30
    def create(self, datapoints_factory_type: str, *args, **kwargs) -> Iterable:
31
        """Create new Datapoints and inform subscribers.
32
33
        The factory method that returns a new object of DatapointsInterface, by
34
        looking at the registered constructors to delegate the object creation.
35
36
        Args:
37
            datapoints_factory_type (str): the name of the "constructor" to use
38
39
        Raises:
40
            RuntimeError: [description]
41
42
        Returns:
43
            Iterable: instance implementing the DatapointsInterface
44
        """
45
        self.subject.name = kwargs.pop('id', kwargs.pop('name', kwargs.pop('file_path', '')))
46
        if kwargs:
47
            msg = f"Kwargs: [{', '.join(f'{k}: {v}' for k, v in kwargs.items())}]"
48
            raise RuntimeError("The 'create' method of DatapointsFactory does not support kwargs:", msg)
49
        self.subject.state = super().create(datapoints_factory_type, *args, **kwargs)
50
        # logger.debug(f"Created datapoints: {json.dumps({
51
        #     'datapoints': self.subject.state,
52
        #     'name': self.subject.name,
53
        # })}")
54
        if args and not hasattr(self, '.name'):
55
            self.name = getattr(args[0], 'name', '')
56
        self.subject.notify()
57
        return self.subject.state
58