Completed
Pull Request — master (#180)
by
unknown
03:21
created

EventGenerator.__init__()   F

Complexity

Conditions 10

Size

Total Lines 46

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 10.0018

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 10
c 2
b 0
f 0
dl 0
loc 46
ccs 36
cts 37
cp 0.973
crap 10.0018
rs 3.2727

How to fix   Complexity   

Complexity

Complex classes like EventGenerator.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
22
        etype: The event type, either an objectId, a NodeId or a Node object
23
    """
24
25 1
    def __init__(self, isession, etype=None, source=ua.ObjectIds.Server):
26 1
        if not etype:
27
            etype = ua.BaseEvent()
28
29 1
        self.logger = logging.getLogger(__name__)
30 1
        self.isession = isession
31 1
        self.event = None
32 1
        node = None
33
34 1
        if isinstance(etype, ua.BaseEvent):
35 1
            self.event = etype
36 1
        elif isinstance(etype, Node):
37 1
            node = etype
38 1
        elif isinstance(etype, ua.NodeId):
39 1
            node = Node(self.isession, etype)
40
        else:
41 1
            node = Node(self.isession, ua.NodeId(etype))
42
43 1
        if node:
44 1
            self.event = get_event_from_type_node(node)
45
46 1
        if isinstance(source, Node):
47 1
            pass
48 1
        elif isinstance(source, ua.NodeId):
49 1
            source = Node(isession, source)
50
        else:
51 1
            source = Node(isession, ua.NodeId(source))
52
53 1
        if self.event.SourceNode:
54 1
            if source.nodeid != self.event.SourceNode:
55 1
                self.logger.warning("Source NodeId: '%s' and event SourceNode: '%s' are not the same. Using '%s' as SourceNode", str(source.nodeid), str(self.event.SourceNode), str(self.event.SourceNode))
56 1
                source = Node(self.isession, self.event.SourceNode)
57
58 1
        self.event.SourceNode = source.nodeid
59 1
        self.event.SourceName = source.get_display_name().Text
60
61 1
        source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
62 1
        refs = []
63 1
        ref = ua.AddReferencesItem()
64 1
        ref.IsForward = True
65 1
        ref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.GeneratesEvent)
66 1
        ref.SourceNodeId = source.nodeid
67 1
        ref.TargetNodeClass = ua.NodeClass.ObjectType
68 1
        ref.TargetNodeId = self.event.EventType
69 1
        refs.append(ref)
70 1
        results = self.isession.add_references(refs)
71
        #result.StatusCode.check()
72
73 1
    def __str__(self):
74
        return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.event.EventType,
75
                                                                                 self.event.SourceNode,
76
                                                                                 self.event.Time,
77
                                                                                 self.event.Message)
78 1
    __repr__ = __str__
79
80 1
    def trigger(self, time=None, message=None):
81
        """
82
        Trigger the event. This will send a notification to all subscribed clients
83
        """
84 1
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
85 1
        if time:
86 1
            self.event.Time = time
87
        else:
88 1
            self.event.Time = datetime.utcnow()
89 1
        self.event.ReceiveTime = datetime.utcnow()
90
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
91 1
        self.event.LocalTime = datetime.utcnow()
92 1
        if message:
93 1
            self.event.Message = ua.LocalizedText(message)
94
        elif not self.event.Message:
95
            self.event.Message = ua.LocalizedText(Node(self.isession, self.event.SourceNode).get_browse_name().Text)
96 1
        self.isession.subscription_service.trigger_event(self.event)
97
98
99 1
def get_event_from_type_node(node):
100 1
    if node.nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
101 1
        return ua.uaevents_auto.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
102
    else:
103 1
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
104 1
        if not parent_eventtype:
105
            return None
106
107 1
        class CustomEvent(parent_eventtype):
108
109 1
            def __init__(self):
110 1
                super(CustomEvent, self).__init__(extended=True)
111 1 View Code Duplication
                self.EventType = node.nodeid
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
112 1
                curr_node = node
113
114 1
                while curr_node.nodeid.Identifier != parent_identifier:
115 1
                    for prop in curr_node.get_properties():
116 1
                        setattr(self, prop.get_browse_name().Name, prop.get_value())
117 1
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
118 1
                    if len(parents) != 1: # Something went wrong
119
                        return None
120 1
                    curr_node = parents[0]
121
122 1
                self._freeze = True
123
124 1
    return CustomEvent()
125
126
127 1
def _find_parent_eventtype(node):
128 1
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
129
130 1
    if len(parents) != 1:   # Something went wrong
131
        return None, None
132 1
    if parents[0].nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
133 1
        return parents[0].nodeid.Identifier, ua.uaevents_auto.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
134
    else:
135
        return _find_parent_eventtype(parents[0])
136