Completed
Pull Request — master (#133)
by Denis
05:19
created

opcua.common.EventGenerator.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1
Metric Value
cc 1
dl 0
loc 2
ccs 1
cts 1
cp 1
crap 1
rs 10
1
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        event: The event object
21
22
        source: The emiting source for the node, either an objectId, NodeId or a Node
23
    """
24
25 1
    def __init__(self, isession, event=ua.BaseEvent()):
26 1
        self.isession = isession
27
        self.event = event
28 1
29
        self.source = Node(isession, self.event.SourceNode)
30 1
        if self.event.SourceNode.Identifier:
31
            self.event.SourceName = self.source.get_display_name().Text
32
            self.source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
33 1
34
    def __str__(self):
35 1
        return "EventGenerator(Event:{})".format(self.event)
36 1
    __repr__ = __str__
37
38 1
    def trigger(self, time=None, message=None):
39
        """
40
        Trigger the event. This will send a notification to all subscribed clients
41 1
        """
42
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
43
        if time:
44 1
            self.event.Time = time
45 1
        else:
46 1
            self.event.Time = datetime.utcnow()
47 1
        self.event.ReciveTime = datetime.utcnow()
48 1
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
49 1
        self.event.LocaleTime = datetime.utcnow()
50 1
        if message:
51 1
            self.Message = ua.LocalizedText(message)
52
        elif not self.event.Message and self.event.SourceNode.Identifier:
53
            self.event.Message = ua.LocalizedText(self.source.get_browse_name().Text)
54
        self.isession.subscription_service.trigger_event(self.event)
55