Passed
Pull Request — master (#365)
by
unknown
02:53
created

SubHandler.datachange_notification()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# coding: utf-8
2
3
from aioconsole import ainput
4
5
import asyncio
6
7
from asyncua import Client, ua
8
9
10
class SubHandler(object):
11
12
    def datachange_notification(self, node, val, data):
13
        print("Python: New data change event", node, val, data)
14
15
    def event_notification(self, event):
16
        print("Python new event:", event)
17
18
19
class OpcUaClient(object):
20
21
    def __init__(self, endpoint):
22
        self.client = Client(endpoint)
23
        self.subscriptions = {}
24
25
    async def __aenter__(self):
26
        await self.client.connect()
27
        return self
28
29
    async def __aexit__(self, exc_type, exc_value, traceback):
30
        for sub in self.subscriptions:
31
            for handle in self.subscriptions[sub]:
32
                await sub.unsubscribe(handle)
33
            await sub.delete()
34
        await self.client.disconnect()
35
36
    async def init(self):
37
        objects = self.client.get_objects_node()
38
        idx = await self.client.get_namespace_index("http://examples.freeopcua.github.io")
39
40
        path = ['%s:NotifierObject' % idx]
41
        noti_obj = await objects.get_child(path)
42
43
        path = ['%s:ConditionObject' % idx]
44
        con_obj = await noti_obj.get_child(path)
45
        condition = self.client.get_node(ua.NodeId(2830))
46
47
        handler = SubHandler()
48
        sub = await self.client.create_subscription(500, handler)
49
        self.subscriptions[sub] = []
50
        con_handle = await sub.subscribe_events(con_obj, condition)
51
        self.subscriptions[sub].append(con_handle)
52
53
        path = ['%s:AlarmObject' % idx]
54
        alarm_obj = await noti_obj.get_child(path)
55
        alarm = self.client.get_node(ua.NodeId(10637))
56
57
        alarm_handle = await sub.subscribe_events(alarm_obj, alarm)
58
        self.subscriptions[sub].append(alarm_handle)
59
60
61
async def interactive(client):
62
    while True:
63
        # exit to disconnect
64
        line = await ainput(">>> ")
65
        print('execute:', line)
66
        if line == 'exit':
67
            break
68
        try:
69
            eval(line)
70
        except Exception as msg:
71
            print('Exception:', msg)
72
            raise Exception
73
74
75
async def start():
76
    async with OpcUaClient("opc.tcp://localhost:4840") as client:
77
        await client.init()
78
        await interactive(client)
79
80
81
if __name__ == '__main__':
82
    loop = asyncio.get_event_loop()
83
    loop.create_task(start())
84
    try:
85
        loop.run_forever()
86
    finally:
87
        loop.run_until_complete(loop.shutdown_asyncgens())
88
        loop.close()
89