Completed
Pull Request — master (#161)
by Denis
04:02
created

EventGenerator.__init__()   F

Complexity

Conditions 10

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 10.0045

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
dl 0
loc 37
ccs 27
cts 28
cp 0.9643
crap 10.0045
rs 3.1304
c 1
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like EventGenerator.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
22
        etype: The event type, either an objectId, a NodeId or a Node object
23
    """
24
25 1
    def __init__(self, isession, etype=None, source=ua.ObjectIds.Server):
26 1
        if not etype:
27
            etype = ua.BaseEvent()
28
29 1
        self.logger = logging.getLogger(__name__)
30 1
        self.isession = isession
31 1
        self.event = None
32 1
        node = None
33
34 1
        if isinstance(etype, ua.BaseEvent):
35 1
            self.event = etype
36 1
        elif isinstance(etype, Node):
37 1
            node = etype
38 1
        elif isinstance(etype, ua.NodeId):
39 1
            node = Node(self.isession, etype)
40
        else:
41 1
            node = Node(self.isession, ua.NodeId(etype))
42
43 1
        if node:
44 1
            self.event = get_event_from_type_node(node)
45
46 1
        if isinstance(source, Node):
47 1
            pass
48 1
        elif isinstance(source, ua.NodeId):
49 1
            source = Node(isession, source)
50
        else:
51 1
            source = Node(isession, ua.NodeId(source))
52
53 1
        if self.event.SourceNode:
54 1
            if source.nodeid != self.event.SourceNode:
55 1
                self.logger.warning("Source NodeId: '%s' and event SourceNode: '%s' are not the same. Using '%s' as SourceNode", str(source.nodeid), str(self.event.SourceNode), str(self.event.SourceNode))
56 1
                source = Node(self.isession, self.event.SourceNode)
57
58 1
        self.event.SourceNode = source.nodeid
59 1
        self.event.SourceName = source.get_display_name().Text
60
61 1
        source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
62
63 1
    def __str__(self):
64
        return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
65 1
    __repr__ = __str__
66
67 1
    def trigger(self, time=None, message=None):
68
        """
69
        Trigger the event. This will send a notification to all subscribed clients
70
        """
71 1
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
72 1
        if time:
73 1
            self.event.Time = time
74
        else:
75
            self.event.Time = datetime.utcnow()
76 1
        self.event.RecieveTime = datetime.utcnow()
77
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
78 1
        self.event.LocalTime = datetime.utcnow()
79 1
        if message:
80 1
            self.event.Message = ua.LocalizedText(message)
81
        elif not self.event.Message:
82
            self.event.Message = ua.LocalizedText(Node(self.isession, self.event.SourceNode).get_browse_name().Text)
83 1
        self.isession.subscription_service.trigger_event(self.event)
84
85
86 1
def get_event_from_type_node(node):
87 1
    if node.nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
88 1
        return ua.uaevents_auto.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
89
    else:
90 1
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
91 1
        if not parent_eventtype:
92
            return None
93
94 1
        class CustomEvent(parent_eventtype):
95
96 1
            def __init__(self):
97 1
                super(CustomEvent, self).__init__(extended=True)
98 1
                self.EventType = node.nodeid
99 1
                curr_node = node
100
101 1 View Code Duplication
                while curr_node.nodeid.Identifier != parent_identifier:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
102 1
                    for prop in curr_node.get_properties():
103 1
                        setattr(self, prop.get_browse_name().Name, prop.get_value())
104 1
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
105 1
                    if len(parents) != 1: # Something went wrong
106
                        return None
107 1
                    curr_node = parents[0]
108
109 1
                self._freeze = True
110
111 1
    return CustomEvent()
112
113
114 1
def get_event_properties_from_type_node(node):
115 1
    properties = []
116 1
    curr_node = node
117
118 1
    while True:
119 1
        properties.extend(curr_node.get_properties())
120 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
121 1
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
122 1
            break
123
124 1
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
125 1
        if len(parents) != 1: # Something went wrong
126
            return None
127 1
        curr_node = parents[0]
128
129 1
    return properties
130
131
132 1
def _find_parent_eventtype(node):
133 1
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
134
135 1
    if len(parents) != 1:   # Something went wrong
136
        return None, None
137 1
    if parents[0].nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
138 1
        return parents[0].nodeid.Identifier, ua.uaevents_auto.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
139
    else:
140
        return _find_parent_eventtype(parents[0])
141