Passed
Pull Request — master (#367)
by
unknown
02:46
created

ha_client-example.server_var_update()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
import sys
2
import asyncio
3
import logging
4
import time
5
6
# import asyncua
7
sys.path.insert(0, "..")
8
9
from asyncua import Server, ua
10
from asyncua.client.ha.ha_client import HaClient, HaMode, HaConfig
11
12
13
# set up logging
14
root = logging.getLogger()
15
root.setLevel(logging.DEBUG)
16
handler = logging.StreamHandler(sys.stdout)
17
handler.setLevel(logging.DEBUG)
18
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
19
handler.setFormatter(formatter)
20
root.addHandler(handler)
21
# diable logging for the servers
22
logging.getLogger("asyncua.server").setLevel(logging.WARNING)
23
24
25
class SubHandler:
26
    """
27
    Basic subscription handler to support datachange_notification.
28
    No need to implement the other handlermethods since the
29
    HA_CLIENT only supports datachange for now.
30
    """
31
32
    def datachange_notification(self, node, val, data):
33
        """
34
        called for every datachange notification from server
35
        """
36
        print(f"Node: {node} has value: {val}\n")
37
38
39
async def start_servers():
40
    """ Spin up two servers with identical configurations """
41
    ports = [4840, 4841]
42
    urls = []
43
    loop = asyncio.get_event_loop()
44
    for port in ports:
45
        server = Server()
46
        await server.init()
47
        url = f"opc.tcp://0.0.0.0:{port}/freeopcua/server/"
48
        urls.append(url)
49
        server.set_endpoint(url)
50
        server.set_server_name("FreeOpcUa Example Server {port}")
51
        # setup our own namespace
52
        uri = "http://examples.freeopcua.github.io"
53
        idx = await server.register_namespace(uri)
54
55
        myobj = await server.nodes.objects.add_object(idx, "MyObject")
56
        myvar = await myobj.add_variable(idx, "MyVariable", 6.7)
57
        await server.start()
58
        loop.create_task(server_var_update(server, myvar))
59
    return urls, myvar
0 ignored issues
show
introduced by
The variable myvar does not seem to be defined in case the for loop on line 44 is not entered. Are you sure this can never be the case?
Loading history...
60
61
62
async def server_var_update(server, myvar):
63
    """
64
    Constantly increment the variable with epoch time
65
    to simulate data notifications.
66
    """
67
    while True:
68
        await asyncio.sleep(1)
69
        await server.write_attribute_value(myvar.nodeid, ua.DataValue(time.time()))
70
71
72
async def main():
73
    # start the servers
74
    urls, node = await start_servers()
75
76
    # set up ha_client with the serveur urls
77
    ha_config = HaConfig(
78
        HaMode.WARM,
79
        keepalive_timer=15,
80
        manager_timer=15,
81
        reconciliator_timer=15,
82
        urls=urls,
83
        session_timeout=30
84
    )
85
    ha = HaClient(ha_config)
86
    await ha.start()
87
88
    publish_interval = 1000
89
    handler = SubHandler()
90
91
    # subscribe to two nodes
92
    sub1 = await ha.create_subscription(publish_interval, handler)
93
    await ha.subscribe_data_change(sub1, [node])
94
95
    # Watch the debug log and check what's happening in the background.
96
    # A basic check could be to `iptables -A OUTPUT -p tcp --dport 4840 -j DROP`
97
    # and observe the failover in action
98
    await asyncio.sleep(60)
99
100
101
if __name__ == "__main__":
102
    logging.basicConfig(level=logging.INFO)
103
    asyncio.run(main())
104