Passed
Pull Request — master (#365)
by
unknown
02:53
created

OpcUaServer.generate_condition()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 10
rs 9.95
c 0
b 0
f 0
1
# coding: utf-8
2
3
from aioconsole import ainput
4
import logging
5
6
import asyncio
7
8
from asyncua import Server, ua
9
10
11
class OpcUaServer(object):
12
13
    def __init__(self, endpoint):
14
        self.server = Server()
15
        self.server.set_endpoint(endpoint)
16
        self.server.set_server_name('Alarms and Conditions Test Server')
17
        self.server.application_type = ua.ApplicationType.Server
18
        self.server.iserver.bind_condition_methods = 500
19
        self.con_gen = None
20
        self.alarm_gen = None
21
22
    async def __aenter__(self):
23
        await self.init()
24
        await self.server.start()
25
        return self
26
27
    async def __aexit__(self, exc_type, exc_value, traceback):
28
        await self.server.stop()
29
30
    async def init(self):
31
        await self.server.init()
32
        uri = "http://examples.freeopcua.github.io"
33
        idx = await self.server.register_namespace(uri)
34
        objects = self.server.get_objects_node()
35
36
        noti_node = await objects.add_object(idx, 'NotifierObject')
37
38
        con_obj = await noti_node.add_object(idx, "ConditionObject")
39
        condition = self.server.get_node(ua.NodeId(2830))
40
        self.con_gen = await self.server.get_event_generator(condition, con_obj,
41
                                                             notifier_path=[ua.ObjectIds.Server, noti_node, con_obj])
42
        self.con_gen.event.add_property('NodeId', con_obj.nodeid, ua.VariantType.NodeId)
43
44
        alarm_obj = await noti_node.add_object(idx, "AlarmObject")
45
        alarm = self.server.get_node(ua.NodeId(10637))
46
        self.alarm_gen = await self.server.get_event_generator(alarm, alarm_obj,
47
                                                               notifier_path=[ua.ObjectIds.Server, noti_node, alarm_obj])
48
49
    def generate_condition(self, retain):
50
        self.con_gen.event.ConditionName = 'Example Condition'
51
        self.con_gen.event.Message = ua.LocalizedText("Some Message")
52
        self.con_gen.event.Severity = 500
53
        self.con_gen.event.BranchId = ua.NodeId(0)
54
        if retain == 1:
55
            self.con_gen.event.Retain = True
56
        else:
57
            self.con_gen.event.Retain = False
58
        self.con_gen.trigger()
59
60
    def generate_alarm(self, active):
61
        self.alarm_gen.event.ConditionName = 'Example Alarm1'
62
        self.alarm_gen.event.Message = ua.LocalizedText("hello from python")
63
        self.alarm_gen.event.Severity = 500
64
        self.alarm_gen.event.BranchId = ua.NodeId(0)
65
        self.alarm_gen.event.AckedState = ua.LocalizedText('Unacknowledged', 'en')
66
        setattr(self.alarm_gen.event, 'AckedState/Id', False)
67
        if active == 1:
68
            self.alarm_gen.event.Retain = True
69
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Active', 'en')
70
            setattr(self.alarm_gen.event, 'ActiveState/Id', True)
71
        else:
72
            self.alarm_gen.event.Retain = False
73
            self.alarm_gen.event.ActiveState = ua.LocalizedText('Inactive', 'en')
74
            setattr(self.alarm_gen.event, 'ActiveState/Id', False)
75
        self.alarm_gen.trigger()
76
77
78
async def interactive(server):
79
    while True:
80
        # server.generate_condition(1)
81
        server.generate_alarm(1)
82
        logging.warning('sent alarm')
83
        await asyncio.sleep(5)
84
        # # line = await ainput(">>> ")
85
        # # print('execute:', line)
86
        # # if line == 'exit':
87
        # #     break
88
        # # try:
89
        # #     eval(line)
90
        # except Exception as msg:
91
        #     print('Exception:', msg)
92
        #     raise Exception
93
94
95
async def main():
96
    async with OpcUaServer("opc.tcp://0.0.0.0:4839") as server:
97
        await interactive(server)
98
99
100
if __name__ == '__main__':
101
    loop = asyncio.get_event_loop()
102
    loop.create_task(main())
103
    try:
104
        loop.run_forever()
105
    finally:
106
        loop.run_until_complete(loop.shutdown_asyncgens())
107
        loop.close()
108