Passed
Pull Request — master (#101)
by
unknown
02:51
created

asyncua.server.event_generator   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 113
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 20
eloc 76
dl 0
loc 113
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A EventGenerator.__init__() 0 5 1
A EventGenerator.__str__() 0 3 1
D EventGenerator.init() 0 50 12
B EventGenerator.trigger() 0 26 6
1
import logging
2
from datetime import datetime
3
import time
4
import uuid
5
from typing import Optional
6
import sys
7
8
from asyncua import ua
9
from ..common import events, event_objects, Node
10
11
12
class EventGenerator:
13
    """
14
    Create an event based on an event type. Per default is BaseEventType used.
15
    Object members are dynamically created from the base event type and send to
16
    client when evebt is triggered (see example code in source)
17
18
    Arguments to constructor are:
19
        server: The InternalSession object to use for query and event triggering
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
        etype: The event type, either an objectId, a NodeId or a Node object
22
    """
23
24
    def __init__(self, isession):
25
        self.logger = logging.getLogger(__name__)
26
        self.isession = isession
27
        self.event: Optional[event_objects.BaseEvent] = None
28
        self.emitting_node: Optional[Node] = None
29
30
    async def init(self, etype=None, emitting_node=ua.ObjectIds.Server):
31
        node = None
32
33
        if isinstance(etype, event_objects.BaseEvent):
34
            self.event = etype
35
        elif isinstance(etype, Node):
36
            node = etype
37
        elif isinstance(etype, ua.NodeId):
38
            node = Node(self.isession, etype)
39
        else:
40
            node = Node(self.isession, ua.NodeId(etype))
41
42
        if node:
43
            self.event = await events.get_event_obj_from_type_node(node)
44
            if isinstance(self.event, event_objects.Condition):
45
                # we need this that it works proper for conditions and alarms aka ConditionId
46
                if isinstance(emitting_node, Node):
47
                    condition_id = ua.NodeId(emitting_node.nodeid.Identifier, emitting_node.nodeid.NamespaceIndex)
48
                elif isinstance(emitting_node, ua.NodeId):
49
                    condition_id = emitting_node
50
                else:
51
                    condition_id = ua.NodeId(emitting_node.Identifier, emitting_node.NamespaceIndex)
52
                self.event.add_property('NodeId', condition_id, ua.VariantType.NodeId)
53
54
        if isinstance(emitting_node, Node):
55
            pass
56
        elif isinstance(emitting_node, ua.NodeId):
57
            emitting_node = Node(self.isession, emitting_node)
58
        else:
59
            emitting_node = Node(self.isession, ua.NodeId(emitting_node))
60
61
        self.event.emitting_node = emitting_node.nodeid
62
        if not self.event.SourceNode:
63
            self.event.SourceNode = emitting_node.nodeid
64
        if not self.event.SourceName:
65
            self.event.SourceName = (await Node(self.isession, self.event.SourceNode).get_browse_name()).Name
66
67
        await emitting_node.set_event_notifier([ua.EventNotifier.SubscribeToEvents])
68
        refs = []
69
        ref = ua.AddReferencesItem()
70
        ref.IsForward = True
71
        ref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.GeneratesEvent)
72
        ref.SourceNodeId = emitting_node.nodeid
73
        ref.TargetNodeClass = ua.NodeClass.ObjectType
74
        ref.TargetNodeId = self.event.EventType
75
        refs.append(ref)
76
        results = await self.isession.add_references(refs)
77
        # result.StatusCode.check()
78
79
        self.emitting_node = emitting_node
80
81
    def __str__(self):
82
        return "EventGenerator(Type:{0}, Emitting Node:{1}, Time:{2}, Message: {3})".format(
83
            self.event.EventType, self.emitting_node, self.event.Time, self.event.Message)
84
85
    __repr__ = __str__
86
87
    def trigger(self, time_attr=None, message=None):
88
        """
89
        Trigger the event. This will send a notification to all subscribed clients
90
        """
91
        self.event.EventId = ua.Variant(uuid.uuid4().hex.encode('utf-8'), ua.VariantType.ByteString)
92
        if time_attr:
93
            self.event.Time = time_attr
94
        else:
95
            self.event.Time = datetime.utcnow()
96
        self.event.ReceiveTime = datetime.utcnow()
97
98
        self.event.LocalTime = ua.uaprotocol_auto.TimeZoneDataType()
99
        if sys.version_info.major > 2:
100
            localtime = time.localtime(self.event.Time.timestamp())
101
            self.event.LocalTime.Offset = localtime.tm_gmtoff//60
102
        else:
103
            localtime = time.localtime(time.mktime(self.event.Time.timetuple()))
104
            self.event.LocalTime.Offset = -(time.altzone if localtime.tm_isdst else time.timezone)
105
        self.event.LocalTime.DaylightSavingInOffset = bool(localtime.tm_isdst != -1)
106
107
        if message:
108
            self.event.Message = ua.LocalizedText(message)
109
        elif not self.event.Message:
110
            self.event.Message = ua.LocalizedText(Node(self.isession, self.event.SourceNode).get_browse_name().Text)
111
112
        self.isession.subscription_service.trigger_event(self.event)
113