1
|
|
|
import sys |
2
|
|
|
import asyncio |
3
|
|
|
import logging |
4
|
|
|
import time |
5
|
|
|
|
6
|
|
|
# import asyncua |
7
|
|
|
sys.path.insert(0, "..") |
8
|
|
|
|
9
|
|
|
from asyncua import Server, ua |
10
|
|
|
from asyncua.client.ha.ha_client import HaClient, HaMode, HaConfig |
11
|
|
|
|
12
|
|
|
|
13
|
|
|
# set up logging |
14
|
|
|
root = logging.getLogger() |
15
|
|
|
root.setLevel(logging.DEBUG) |
16
|
|
|
handler = logging.StreamHandler(sys.stdout) |
17
|
|
|
handler.setLevel(logging.DEBUG) |
18
|
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
19
|
|
|
handler.setFormatter(formatter) |
20
|
|
|
root.addHandler(handler) |
21
|
|
|
# diable logging for the servers |
22
|
|
|
logging.getLogger("asyncua.server").setLevel(logging.WARNING) |
23
|
|
|
|
24
|
|
|
|
25
|
|
|
class SubHandler: |
26
|
|
|
""" |
27
|
|
|
Basic subscription handler to support datachange_notification. |
28
|
|
|
No need to implement the other handlermethods since the |
29
|
|
|
HA_CLIENT only supports datachange for now. |
30
|
|
|
""" |
31
|
|
|
|
32
|
|
|
def datachange_notification(self, node, val, data): |
33
|
|
|
""" |
34
|
|
|
called for every datachange notification from server |
35
|
|
|
""" |
36
|
|
|
print(f"Node: {node} has value: {val}\n") |
37
|
|
|
|
38
|
|
|
|
39
|
|
|
async def start_servers(): |
40
|
|
|
""" Spin up two servers with identical configurations """ |
41
|
|
|
ports = [4840, 4841] |
42
|
|
|
urls = [] |
43
|
|
|
loop = asyncio.get_event_loop() |
44
|
|
|
for port in ports: |
45
|
|
|
server = Server() |
46
|
|
|
await server.init() |
47
|
|
|
url = f"opc.tcp://0.0.0.0:{port}/freeopcua/server/" |
48
|
|
|
urls.append(url) |
49
|
|
|
server.set_endpoint(url) |
50
|
|
|
server.set_server_name("FreeOpcUa Example Server {port}") |
51
|
|
|
# setup our own namespace |
52
|
|
|
uri = "http://examples.freeopcua.github.io" |
53
|
|
|
idx = await server.register_namespace(uri) |
54
|
|
|
|
55
|
|
|
myobj = await server.nodes.objects.add_object(idx, "MyObject") |
56
|
|
|
myvar = await myobj.add_variable(idx, "MyVariable", 6.7) |
57
|
|
|
await server.start() |
58
|
|
|
loop.create_task(server_var_update(server, myvar)) |
59
|
|
|
return urls, myvar |
|
|
|
|
60
|
|
|
|
61
|
|
|
|
62
|
|
|
async def server_var_update(server, myvar): |
63
|
|
|
""" |
64
|
|
|
Constantly increment the variable with epoch time |
65
|
|
|
to simulate data notifications. |
66
|
|
|
""" |
67
|
|
|
while True: |
68
|
|
|
await asyncio.sleep(1) |
69
|
|
|
await server.write_attribute_value(myvar.nodeid, ua.DataValue(time.time())) |
70
|
|
|
|
71
|
|
|
|
72
|
|
|
async def main(): |
73
|
|
|
# start the servers |
74
|
|
|
urls, node = await start_servers() |
75
|
|
|
|
76
|
|
|
# set up ha_client with the serveur urls |
77
|
|
|
ha_config = HaConfig( |
78
|
|
|
HaMode.WARM, |
79
|
|
|
keepalive_timer=15, |
80
|
|
|
manager_timer=15, |
81
|
|
|
reconciliator_timer=15, |
82
|
|
|
urls=urls, |
83
|
|
|
session_timeout=30 |
84
|
|
|
) |
85
|
|
|
ha = HaClient(ha_config) |
86
|
|
|
await ha.start() |
87
|
|
|
|
88
|
|
|
publish_interval = 1000 |
89
|
|
|
handler = SubHandler() |
90
|
|
|
|
91
|
|
|
# subscribe to two nodes |
92
|
|
|
sub1 = await ha.create_subscription(publish_interval, handler) |
93
|
|
|
await ha.subscribe_data_change(sub1, [node]) |
94
|
|
|
|
95
|
|
|
# Watch the debug log and check what's happening in the background. |
96
|
|
|
# A basic check could be to `iptables -A OUTPUT -p tcp --dport 4840 -j DROP` |
97
|
|
|
# and observe the failover in action |
98
|
|
|
await asyncio.sleep(60) |
99
|
|
|
|
100
|
|
|
|
101
|
|
|
if __name__ == "__main__": |
102
|
|
|
logging.basicConfig(level=logging.INFO) |
103
|
|
|
asyncio.run(main()) |
104
|
|
|
|