Passed
Pull Request — master (#101)
by
unknown
02:37
created

server-alarms-and-conditions   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 98
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 73
dl 0
loc 98
rs 10
c 0
b 0
f 0
wmc 13

6 Methods

Rating   Name   Duplication   Size   Complexity  
A OpcUaServer.__init__() 0 7 1
A OpcUaServer.generate_condition() 0 10 2
A OpcUaServer.__aenter__() 0 4 1
A OpcUaServer.__aexit__() 0 2 1
A OpcUaServer.generate_alarm() 0 16 2
A OpcUaServer.init() 0 14 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
A interactive() 0 11 3
A main() 0 3 2
1
# coding: utf-8
2
3
from aioconsole import ainput
4
5
import asyncio
6
7
from asyncua import Server, ua
8
9
10
class OpcUaServer(object):
11
12
    def __init__(self, endpoint):
13
        self.server = Server()
14
        self.server.set_endpoint(endpoint)
15
        self.server.set_server_name('Alarms and Conditions Test Server')
16
        self.server.application_type = ua.ApplicationType.Server
17
        self.con_gen = None
18
        self.alarm_gen = None
19
20
    async def __aenter__(self):
21
        await self.init()
22
        await self.server.start()
23
        return self
24
25
    async def __aexit__(self, exc_type, exc_value, traceback):
26
        await self.server.stop()
27
28
    async def init(self, shelf_file=None):
29
        await self.server.iserver.init(shelf_file)
30
        uri = "http://examples.freeopcua.github.io"
31
        idx = await self.server.register_namespace(uri)
32
        objects = self.server.get_objects_node()
33
34
        con_obj = await objects.add_object(idx, "ConditionObject")
35
        condition = self.server.get_node(ua.NodeId(2830))
36
        self.con_gen = await self.server.get_event_generator(condition, con_obj)
37
        self.con_gen.event.add_property('NodeId', con_obj.nodeid, ua.VariantType.NodeId)
38
39
        alarm_obj = await objects.add_object(idx, "AlarmObject")
40
        alarm = self.server.get_node(ua.NodeId(10637))
41
        self.alarm_gen = await self.server.get_event_generator(alarm, alarm_obj)
42
43
    def generate_condition(self, retain):
44
        self.con_gen.event.ConditionName = 'Example Condition'
45
        self.con_gen.event.Message = ua.LocalizedText("Some Message")
46
        self.con_gen.event.Severity = 500
47
        self.con_gen.event.BranchId = ua.NodeId(0)
48
        if retain == 1:
49
            self.con_gen.event.Retain = True
50
        else:
51
            self.con_gen.event.Retain = False
52
        self.con_gen.trigger()
53
54
    def generate_alarm(self, active):
55
        self.alarm_gen.event.ConditionName = 'Example Alarm'
56
        self.alarm_gen.event.Message = ua.LocalizedText("Some Message")
57
        self.alarm_gen.event.Severity = 500
58
        self.alarm_gen.event.BranchId = ua.NodeId(0)
59
        self.alarm_gen.event.AckedState = ua.LocalizedText('Unacknowledged', 'en')
60
        setattr(self.alarm_gen.event, 'AckedState/Id', False)
61
        if active == 1:
62
            self.alarm_gen.event.Retain = True
63
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Active', 'en')
64
            setattr(self.alarm_gen.event, 'ActiveState/Id', True)
65
        else:
66
            self.alarm_gen.event.Retain = False
67
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Inactive', 'en')
68
            setattr(self.alarm_gen.event, 'ActiveState/Id', False)
69
        self.alarm_gen.trigger()
70
71
72
async def interactive(server):
73
    while True:
74
        # server.generate_condition(1)
75
        # server.generate_alarm(1)
76
        line = await ainput(">>> ")
77
        print('execute:', line)
78
        try:
79
            eval(line)
80
        except Exception as msg:
81
            print('Exception:', msg)
82
            raise Exception
83
84
85
async def main():
86
    async with OpcUaServer("opc.tcp://0.0.0.0:4840") as server:
87
        await interactive(server)
88
89
90
if __name__ == '__main__':
91
    loop = asyncio.get_event_loop()
92
    loop.create_task(main())
93
    try:
94
        loop.run_forever()
95
    finally:
96
        loop.run_until_complete(loop.shutdown_asyncgens())
97
        loop.close()
98