Passed
Pull Request — master (#101)
by
unknown
02:37
created

SubHandler.datachange_notification()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# coding: utf-8
2
3
from aioconsole import ainput
4
5
import asyncio
6
7
from asyncua import Client, ua
8
9
10
class SubHandler(object):
11
12
    def datachange_notification(self, node, val, data):
13
        print("Python: New data change event", node, val, data)
14
15
    def event_notification(self, event):
16
        print("Python new event:", event)
17
18
19
class OpcUaClient(object):
20
21
    def __init__(self, endpoint):
22
        self.client = Client(endpoint)
23
        self.subscriptions = {}
24
25
    async def __aenter__(self):
26
        await self.client.connect()
27
        return self
28
29
    async def __aexit__(self, exc_type, exc_value, traceback):
30
        for sub in self.subscriptions:
31
            for handle in self.subscriptions[sub]:
32
                await sub.unsubscribe(handle)
33
            await sub.delete()
34
        await self.client.disconnect()
35
36
    async def init(self):
37
        objects = self.client.get_objects_node()
38
        idx = await self.client.get_namespace_index("http://examples.freeopcua.github.io")
39
40
        path = ['%s:ConditionObject' % idx]
41
        con_obj = await objects.get_child(path)
42
        condition = self.client.get_node(ua.NodeId(2830))
43
44
        handler = SubHandler()
45
        sub = await self.client.create_subscription(500, handler)
46
        self.subscriptions[sub] = []
47
        con_handle = await sub.subscribe_events(con_obj, condition)
48
        self.subscriptions[sub].append(con_handle)
49
50
        path = ['%s:AlarmObject' % idx]
51
        alarm_obj = await objects.get_child(path)
52
        alarm = self.client.get_node(ua.NodeId(10637))
53
54
        alarm_handle = await sub.subscribe_events(alarm_obj, alarm)
55
        self.subscriptions[sub].append(alarm_handle)
56
57
58
async def interactive(client):
59
    while True:
60
        # exit to disconnect
61
        line = await ainput(">>> ")
62
        print('execute:', line)
63
        if line == 'exit':
64
            break
65
        try:
66
            eval(line)
67
        except Exception as msg:
68
            print('Exception:', msg)
69
            raise Exception
70
71
72
async def start():
73
    async with OpcUaClient("opc.tcp://localhost:4840") as client:
74
        await client.init()
75
        await interactive(client)
76
77
78
if __name__ == '__main__':
79
    loop = asyncio.get_event_loop()
80
    loop.create_task(start())
81
    try:
82
        loop.run_forever()
83
    finally:
84
        loop.run_until_complete(loop.shutdown_asyncgens())
85
        loop.close()
86