Completed
Pull Request — master (#161)
by Denis
02:27
created

EventGenerator.__init__()   F

Complexity

Conditions 10

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 10.5696

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
c 1
b 0
f 0
dl 0
loc 37
ccs 23
cts 28
cp 0.8214
crap 10.5696
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like EventGenerator.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
22
        etype: The event type, either an objectId, a NodeId or a Node object
23
    """
24
25 1
    def __init__(self, isession, etype=None, source=ua.ObjectIds.Server):
26 1
        if not etype:
27
            etype = ua.BaseEvent()
28 1
29
        self.logger = logging.getLogger(__name__)
30 1
        self.isession = isession
31
        self.event = None
32
        node = None
33 1
34
        if isinstance(etype, ua.BaseEvent):
35 1
            self.event = etype
36 1
        elif isinstance(etype, Node):
37
            node = etype
38 1
        elif isinstance(etype, ua.NodeId):
39
            node = Node(self.isession, etype)
40
        else:
41 1
            node = Node(self.isession, ua.NodeId(etype))
42
43
        if node:
44 1
            self.event = get_event_from_type_node(node)
45 1
46 1
        if isinstance(source, Node):
47 1
            pass
48 1
        elif isinstance(source, ua.NodeId):
49 1
            source = Node(isession, source)
50 1
        else:
51 1
            source = Node(isession, ua.NodeId(source))
52
53
        if self.event.SourceNode:
54 1
            if source.nodeid != self.event.SourceNode:
55 1
                self.logger.warning("Source NodeId: '%s' and event SourceNode: '%s' are not the same. Using '%s' as SourceNode", str(source.nodeid), str(self.event.SourceNode), str(self.event.SourceNode))
56 1
                source = Node(self.isession, self.event.SourceNode)
57 1
58 1
        self.event.SourceNode = source.nodeid
59
        self.event.SourceName = source.get_display_name().Text
60 1
61
        source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
62 1
63
    def __str__(self):
64 1
        return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
65
    __repr__ = __str__
66
67
    def trigger(self, time=None, message=None):
68 1
        """
69
        Trigger the event. This will send a notification to all subscribed clients
70 1
        """
71 1
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
72 1
        if time:
73 1
            self.event.Time = time
74 1
        else:
75
            self.event.Time = datetime.utcnow()
76
        self.event.RecieveTime = datetime.utcnow()
77
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
78
        self.event.LocalTime = datetime.utcnow()
79
        if message:
80
            self.event.Message = ua.LocalizedText(message)
81
        elif not self.event.Message:
82
            self.event.Message = ua.LocalizedText(Node(self.isession, self.event.SourceNode).get_browse_name().Text)
83
        self.isession.subscription_service.trigger_event(self.event)
84
85
86
def get_event_from_type_node(node):
87
    if node.nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
88
        return ua.uaevents_auto.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
89
    else:
90
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
91
        if not parent_eventtype:
92
            return None
93
94
        class CustomEvent(parent_eventtype):
95
96
            def __init__(self):
97
                super(CustomEvent, self).__init__(extended=True)
98
                self.EventType = node.nodeid
99
                curr_node = node
100
101 View Code Duplication
                while curr_node.nodeid.Identifier != parent_identifier:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
102
                    for prop in curr_node.get_properties():
103
                        setattr(self, prop.get_browse_name().Name, prop.get_value())
104
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
105
                    if len(parents) != 1: # Something went wrong
106
                        return None
107
                    curr_node = parents[0]
108
109
                self._freeze = True
110
111
    return CustomEvent()
112
113
114
def get_event_properties_from_type_node(node):
115
    properties = []
116
    curr_node = node
117
118
    print node
119
120 View Code Duplication
    while True:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
121
        properties.extend(curr_node.get_properties())
122
123
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
124
            break
125
126
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
127
        if len(parents) != 1: # Something went wrong
128
            return None
129
        curr_node = parents[0]
130
131
    return properties
132
133
134
def _find_parent_eventtype(node):
135
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
136
137
    if len(parents) != 1:   # Something went wrong
138
        return None, None
139
    if parents[0].nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
140
        return parents[0].nodeid.Identifier, ua.uaevents_auto.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
141
    else:
142
        return _find_parent_eventtype(parents[0])
143