Passed
Pull Request — master (#101)
by
unknown
06:41
created

SubHandler.datachange_notification()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# coding: utf-8
2
3
import asyncio
4
5
from asyncua import Client, ua
6
7
8
class SubHandler(object):
9
10
    def datachange_notification(self, node, val, data):
11
        print("Python: New data change event", node, val, data)
12
13
    def event_notification(self, event):
14
        print("Python new event:", event)
15
16
17
class OpcUaClient(object):
18
19
    def __init__(self, endpoint):
20
        self.client = Client(endpoint)
21
22
    async def init(self, ):
23
        await self.client.connect()
24
        objects = self.client.get_objects_node()
25
        idx = await self.client.get_namespace_index("http://examples.freeopcua.github.io")
26
27
        path = ['%s:ConditionObject' % idx]
28
        con_obj = await objects.get_child(path)
29
        condition = self.client.get_node(ua.NodeId(2830))
30
31
        handler = SubHandler()
32
        sub = await self.client.create_subscription(500, handler)
33
        con_handle = await sub.subscribe_events(con_obj, condition)
34
35
        path = ['%s:AlarmObject' % idx]
36
        alarm_obj = await objects.get_child(path)
37
        alarm = self.client.get_node(ua.NodeId(10637))
38
39
        alarm_handle = await sub.subscribe_events(alarm_obj, alarm)
40
41
42
async def start():
43
    client = OpcUaClient("opc.tcp://localhost:4840")
44
    await client.init()
45
46
47
if __name__ == '__main__':
48
    loop = asyncio.get_event_loop()
49
    loop.create_task(start())
50
    loop.run_forever()
51