Completed
Pull Request — master (#133)
by Denis
02:36
created

opcua.common.Event.__init__()   B

Complexity

Conditions 5

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 5.0909
Metric Value
cc 5
dl 0
loc 34
ccs 22
cts 26
cp 0.8462
crap 5.0909
rs 8.0894

2 Methods

Rating   Name   Duplication   Size   Complexity  
A opcua.common.EventGenerator.__str__() 0 2 1
B opcua.common.EventGenerator.trigger() 0 17 5
1
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        event: The event object
21
    """
22
23 1
    def __init__(self, isession, event=ua.BaseEvent()):
24 1
        self.isession = isession
25 1
        self.event = event
26
27 1
        self.source = Node(isession, self.event.SourceNode)
28 1
        if self.event.SourceNode.Identifier:
29 1
            self.event.SourceName = self.source.get_display_name().Text
30 1
            self.source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
31
32 1
    def __str__(self):
33
        return "EventGenerator(Event:{})".format(self.event)
34 1
    __repr__ = __str__
35
36 1
    def trigger(self, time=None, message=None):
37
        """
38
        Trigger the event. This will send a notification to all subscribed clients
39
        """
40 1
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
41 1
        if time:
42
            self.event.Time = time
43
        else:
44 1
            self.event.Time = datetime.utcnow()
45 1
        self.event.ReciveTime = datetime.utcnow()
46
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
47 1
        self.event.LocaleTime = datetime.utcnow()
48 1
        if message:
49
            self.Message = ua.LocalizedText(message)
50 1
        elif not self.event.Message and self.event.SourceNode.Identifier:
51
            self.event.Message = ua.LocalizedText(self.source.get_browse_name().Text)
52
        self.isession.subscription_service.trigger_event(self.event)
53