Passed
Pull Request — master (#101)
by
unknown
04:13
created

server-alarms-and-conditions.main()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 0
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# coding: utf-8
2
3
from aioconsole import ainput
4
5
import asyncio
6
7
from asyncua import Server, ua
8
9
10
class OpcUaServer(object):
11
12
    def __init__(self, endpoint):
13
        self.server = Server()
14
        self.server.set_endpoint(endpoint)
15
        self.server.set_server_name('Alarms and Conditions Test Server')
16
        self.server.application_type = ua.ApplicationType.Server
17
        self.con_gen = None
18
        self.alarm_gen = None
19
20
    async def init(self, shelf_file=None):
21
        await self.server.iserver.init(shelf_file)
22
        uri = "http://examples.freeopcua.github.io"
23
        idx = await self.server.register_namespace(uri)
24
        objects = self.server.get_objects_node()
25
26
        con_obj = await objects.add_object(idx, "ConditionObject")
27
        condition = self.server.get_node(ua.NodeId(2830))
28
        self.con_gen = await self.server.get_event_generator(condition, con_obj)
29
        self.con_gen.event.add_property('NodeId', con_obj.nodeid, ua.VariantType.NodeId)
30
31
        alarm_obj = await objects.add_object(idx, "AlarmObject")
32
        alarm = self.server.get_node(ua.NodeId(10637))
33
        self.alarm_gen = await self.server.get_event_generator(alarm, alarm_obj)
34
35
    def generate_condition(self, retain):
36
        self.con_gen.event.ConditionName = 'Example Condition'
37
        self.con_gen.event.Message = ua.LocalizedText("Some Message")
38
        self.con_gen.event.Severity = 500
39
        self.con_gen.event.BranchId = ua.NodeId(0)
40
        if retain == 1:
41
            self.con_gen.event.Retain = True
42
        else:
43
            self.con_gen.event.Retain = False
44
        self.con_gen.trigger()
45
46
    def generate_alarm(self, active):
47
        self.alarm_gen.event.ConditionName = 'Example Alarm'
48
        self.alarm_gen.event.Message = ua.LocalizedText("Some Message")
49
        self.alarm_gen.event.Severity = 500
50
        self.alarm_gen.event.BranchId = ua.NodeId(0)
51
        self.alarm_gen.event.AckedState = ua.LocalizedText('Unacknowledged', 'en')
52
        setattr(self.alarm_gen.event, 'AckedState/Id', False)
53
        if active == 1:
54
            self.alarm_gen.event.Retain = True
55
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Active', 'en')
56
            setattr(self.alarm_gen.event, 'ActiveState/Id', True)
57
        else:
58
            self.alarm_gen.event.Retain = False
59
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Inactive', 'en')
60
            setattr(self.alarm_gen.event, 'ActiveState/Id', False)
61
        self.alarm_gen.trigger()
62
63
64
async def interactive(server):
65
    while True:
66
        # server.generate_condition(1)
67
        # server.generate_alarm(1)
68
        line = await ainput(">>> ")
69
        print('execute:', line)
70
        try:
71
            eval(line)
72
        except Exception as msg:
73
            print('Exception:', msg)
74
            raise Exception
75
76
77
async def main():
78
    server = OpcUaServer("opc.tcp://0.0.0.0:4840")
79
    await server.init()
80
    await server.server.start()
81
    await interactive(server)
82
83
84
if __name__ == '__main__':
85
    loop = asyncio.get_event_loop()
86
    loop.create_task(main())
87
    loop.run_forever()
88