Passed
Pull Request — master (#101)
by
unknown
06:41
created

OpcUaServer.generate_alarm()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 15

Duplication

Lines 16
Ratio 100 %

Importance

Changes 0
Metric Value
cc 2
eloc 15
nop 2
dl 16
loc 16
rs 9.65
c 0
b 0
f 0
1
# coding: utf-8
2
3
from aioconsole import ainput
4
5
import asyncio
6
7
from asyncua import Server, ua
8
9
10
class OpcUaServer(object):
11
12
    def __init__(self, endpoint):
13
        self.server = Server()
14
        self.server.set_endpoint(endpoint)
15
        self.server.set_server_name('Alarms and Conditions Test Server')
16
        self.server.application_type = ua.ApplicationType.Server
17
        self.con_gen = None
18
        self.alarm_gen = None
19
        self.alarm_gen2 = None
20
21
    async def init(self, shelf_file=None):
22
        await self.server.iserver.init(shelf_file)
23
        uri = "http://examples.freeopcua.github.io"
24
        idx = await self.server.register_namespace(uri)
25
        objects = self.server.get_objects_node()
26
27
        con_obj = await objects.add_object(idx, "ConditionObject")
28
        condition = self.server.get_node(ua.NodeId(2830))
29
        self.con_gen = await self.server.get_event_generator(condition, con_obj)
30
        self.con_gen.event.add_property('NodeId', con_obj.nodeid, ua.VariantType.NodeId)
31
32
        alarm_obj = await objects.add_object(idx, "AlarmObject")
33
        alarm = self.server.get_node(ua.NodeId(10637))
34
        self.alarm_gen = await self.server.get_event_generator(alarm, alarm_obj)
35
        self.alarm_gen2 = await self.server.get_event_generator(alarm, alarm_obj)
36
37
    def generate_condition(self, retain):
38
        self.con_gen.event.ConditionName = 'Example Condition'
39
        self.con_gen.event.Message = ua.LocalizedText("Some Message")
40
        self.con_gen.event.Severity = 500
41
        self.con_gen.event.BranchId = ua.NodeId(0)
42
        if retain == 1:
43
            self.con_gen.event.Retain = True
44
        else:
45
            self.con_gen.event.Retain = False
46
        self.con_gen.trigger()
47
48 View Code Duplication
    def generate_alarm(self, active):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
49
        self.alarm_gen.event.ConditionName = 'Example Alarm'
50
        self.alarm_gen.event.Message = ua.LocalizedText("Some Message")
51
        self.alarm_gen.event.Severity = 500
52
        self.alarm_gen.event.BranchId = ua.NodeId(0)
53
        self.alarm_gen.event.AckedState = ua.LocalizedText('Unacknowledged', 'en')
54
        setattr(self.alarm_gen.event, 'AckedState/Id', False)
55
        if active == 1:
56
            self.alarm_gen.event.Retain = True
57
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Active', 'en')
58
            setattr(self.alarm_gen.event, 'ActiveState/Id', True)
59
        else:
60
            self.alarm_gen.event.Retain = False
61
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Inactive', 'en')
62
            setattr(self.alarm_gen.event, 'ActiveState/Id', False)
63
        self.alarm_gen.trigger()
64
65 View Code Duplication
    def generate_alarm2(self, active):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
        self.alarm_gen2.event.SourceName = 'andere source'
67
        self.alarm_gen2.event.ConditionName = 'Example Alarm'
68
        self.alarm_gen2.event.Message = ua.LocalizedText("Some Message")
69
        self.alarm_gen2.event.Severity = 500
70
        self.alarm_gen2.event.BranchId = ua.NodeId(0)
71
        self.alarm_gen2.event.AckedState = ua.LocalizedText('Unacknowledged', 'en')
72
        setattr(self.alarm_gen2.event, 'AckedState/Id', False)
73
        if active == 1:
74
            self.alarm_gen2.event.Retain = True
75
            self.alarm_gen2.event.ActiveState = ua.LocalizedText('Active', 'en')
76
            setattr(self.alarm_gen2.event, 'ActiveState/Id', True)
77
        else:
78
            self.alarm_gen2.event.Retain = False
79
            self.alarm_gen2.event.ActiveState = ua.LocalizedText('Inactive', 'en')
80
            setattr(self.alarm_gen2.event, 'ActiveState/Id', False)
81
        self.alarm_gen2.trigger()
82
83
84
async def interactive(server):
85
    while True:
86
        # server.generate_condition(1)
87
        # server.generate_alarm(1)
88
        line = await ainput(">>> ")
89
        print('execute:', line)
90
        try:
91
            eval(line)
92
        except Exception as msg:
93
            print('Exception:', msg)
94
            raise Exception
95
96
97
async def main():
98
    server = OpcUaServer("opc.tcp://0.0.0.0:4840")
99
    await server.init()
100
    await server.server.start()
101
    await interactive(server)
102
103
104
if __name__ == '__main__':
105
    loop = asyncio.get_event_loop()
106
    loop.create_task(main())
107
    loop.run_forever()
108