Passed
Push — master ( cc7a4b...4d42d8 )
by Konstantinos
43s queued 14s
created

so_magic.data.magic_datapoints_factory   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 63
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 63
rs 10
c 0
b 0
f 0
wmc 4

1 Method

Rating   Name   Duplication   Size   Complexity  
A BroadcastingDatapointsFactory.create() 0 32 4
1
"""This module is responsible to provide means of creating (instantiating) objects
2
representing Datapoints collections."""
3
import logging
4
import json
5
from typing import Iterable
6
import attr
7
from so_magic.utils import Subject
8
from .datapoints import DatapointsFactory
9
10
logger = logging.getLogger(__name__)
11
12
13
# TODO refactor to a composition (not inheritance) implementation of the below class
14
# reason is that it will be more evident that this class simply combines the
15
# features of 2 other classes (Subject & DatapointsFactory) to get the job done
16
@attr.s
17
class BroadcastingDatapointsFactory(DatapointsFactory):
18
    """Creates Datapoints objects and informs its subscribers when that happens.
19
20
    A factory class that informs its subscribers when a new object that
21
    implements the DatapointsInterface is created (following a request).
22
23
    Args:
24
        subject (Subject, optional): the subject of observation; the "thing" that others
25
                          listen to
26
    """
27
    subject: Subject = attr.ib(init=True, default=attr.Factory(Subject))
28
    name: str = attr.ib(init=False, default='')
29
    # TODO check if above can be removed, along with self.name = getattr(args[0], 'name', '') in 'create' method
30
31
    def create(self, datapoints_factory_type: str, *args, **kwargs) -> Iterable:
32
        """Create new Datapoints and inform subscribers.
33
34
        The factory method that returns a new object of DatapointsInterface, by
35
        looking at the registered constructors to delegate the object creation.
36
37
        Args:
38
            datapoints_factory_type (str): the name of the "constructor" to use
39
40
        Raises:
41
            RuntimeError: [description]
42
43
        Returns:
44
            Iterable: instance implementing the DatapointsInterface
45
        """
46
        self.subject.name = kwargs.pop('id', kwargs.pop('name', kwargs.pop('file_path', '')))
47
        if kwargs:
48
            msg = f"Kwargs: [{', '.join(f'{k}: {v}' for k, v in kwargs.items())}]"
49
            raise RuntimeError("The 'create' method of DatapointsFactory does not support kwargs:", msg)
50
        self.subject.state = super().create(datapoints_factory_type, *args, **kwargs)
51
        # logger.debug(f"Created datapoints: {json.dumps({
52
        #     'datapoints': self.subject.state,
53
        #     'name': self.subject.name,
54
        # })}")
55
        print("Created datapoints: {}".format(json.dumps({
56
            'datapoints': str(self.subject.state),
57
            'name': self.subject.name,
58
        })))
59
        if args and not hasattr(self, '.name'):
60
            self.name = getattr(args[0], 'name', '')
61
        self.subject.notify()
62
        return self.subject.state
63