Test Failed
Pull Request — master (#112)
by
unknown
02:26
created

OpcUaServer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# coding: utf-8
2
3
from aioconsole import ainput
4
5
import asyncio
6
7
from asyncua import Server, ua
8
9
10
class OpcUaServer(object):
11
12
    def __init__(self, endpoint):
13
        self.server = Server()
14
        self.server.set_endpoint(endpoint)
15
        self.server.set_server_name('Alarms and Conditions Test Server')
16
        self.server.application_type = ua.ApplicationType.Server
17
        self.server.iserver.bind_condition_methods = 500
18
        self.con_gen = None
19
        self.alarm_gen = None
20
21
    async def __aenter__(self):
22
        await self.init()
23
        await self.server.start()
24
        return self
25
26
    async def __aexit__(self, exc_type, exc_value, traceback):
27
        await self.server.stop()
28
29
    async def init(self):
30
        await self.server.init()
31
        uri = "http://examples.freeopcua.github.io"
32
        idx = await self.server.register_namespace(uri)
33
        objects = self.server.get_objects_node()
34
35
        noti_node = await objects.add_object(idx, 'NotifierObject')
36
37
        con_obj = await noti_node.add_object(idx, "ConditionObject")
38
        condition = self.server.get_node(ua.NodeId(2830))
39
        self.con_gen = await self.server.get_event_generator(condition, con_obj,
40
                                                             notifier_path=[ua.ObjectIds.Server, noti_node, con_obj])
41
        self.con_gen.event.add_property('NodeId', con_obj.nodeid, ua.VariantType.NodeId)
42
43
        alarm_obj = await noti_node.add_object(idx, "AlarmObject")
44
        alarm = self.server.get_node(ua.NodeId(10637))
45
        self.alarm_gen = await self.server.get_event_generator(alarm, alarm_obj,
46
                                                               notifier_path=[ua.ObjectIds.Server, noti_node, alarm_obj])
47
48
    def generate_condition(self, retain):
49
        self.con_gen.event.ConditionName = 'Example Condition'
50
        self.con_gen.event.Message = ua.LocalizedText("Some Message")
51
        self.con_gen.event.Severity = 500
52
        self.con_gen.event.BranchId = ua.NodeId(0)
53
        if retain == 1:
54
            self.con_gen.event.Retain = True
55
        else:
56
            self.con_gen.event.Retain = False
57
        self.con_gen.trigger()
58
59
    def generate_alarm(self, active):
60
        self.alarm_gen.event.ConditionName = 'Example Alarm1'
61
        self.alarm_gen.event.Message = ua.LocalizedText("error in module1")
62
        self.alarm_gen.event.Severity = 500
63
        self.alarm_gen.event.BranchId = ua.NodeId(0)
64
        self.alarm_gen.event.AckedState = ua.LocalizedText('Unacknowledged', 'en')
65
        setattr(self.alarm_gen.event, 'AckedState/Id', False)
66
        if active == 1:
67
            self.alarm_gen.event.Retain = True
68
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Active', 'en')
69
            setattr(self.alarm_gen.event, 'ActiveState/Id', True)
70
        else:
71
            self.alarm_gen.event.Retain = False
72
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Inactive', 'en')
73
            setattr(self.alarm_gen.event, 'ActiveState/Id', False)
74
        self.alarm_gen.trigger()
75
76
77
async def interactive(server):
78 View Code Duplication
    while True:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
79
        # server.generate_condition(1)
80
        # server.generate_alarm(1)
81
        line = await ainput(">>> ")
82
        print('execute:', line)
83
        if line == 'exit':
84
            break
85
        try:
86
            eval(line)
87
        except Exception as msg:
88
            print('Exception:', msg)
89
            raise Exception
90
91
92
async def main():
93
    async with OpcUaServer("opc.tcp://0.0.0.0:4840") as server:
94
        await interactive(server)
95
96
97
if __name__ == '__main__':
98
    loop = asyncio.get_event_loop()
99
    loop.create_task(main())
100
    try:
101
        loop.run_forever()
102
    finally:
103
        loop.run_until_complete(loop.shutdown_asyncgens())
104
        loop.close()
105