1
|
|
|
import os.path |
2
|
|
|
try: |
3
|
|
|
from IPython import embed |
4
|
|
|
except ImportError: |
5
|
|
|
import code |
6
|
|
|
|
7
|
|
|
def embed(): |
8
|
|
|
vars = globals() |
9
|
|
|
vars.update(locals()) |
10
|
|
|
shell = code.InteractiveConsole(vars) |
|
|
|
|
11
|
|
|
shell.interact() |
12
|
|
|
|
13
|
|
|
from asyncua import ua, uamethod, Server |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
@uamethod |
17
|
|
|
def say_hello_xml(parent, happy): |
18
|
|
|
print("Calling say_hello_xml") |
19
|
|
|
if happy: |
20
|
|
|
result = "I'm happy" |
21
|
|
|
else: |
22
|
|
|
result = "I'm not happy" |
23
|
|
|
print(result) |
24
|
|
|
return result |
25
|
|
|
|
26
|
|
|
|
27
|
|
|
@uamethod |
28
|
|
|
def say_hello(parent, happy): |
29
|
|
|
if happy: |
30
|
|
|
result = "I'm happy" |
31
|
|
|
else: |
32
|
|
|
result = "I'm not happy" |
33
|
|
|
print(result) |
34
|
|
|
return result |
35
|
|
|
|
36
|
|
|
|
37
|
|
|
@uamethod |
38
|
|
|
def say_hello_array(parent, happy): |
39
|
|
|
if happy: |
40
|
|
|
result = "I'm happy" |
41
|
|
|
else: |
42
|
|
|
result = "I'm not happy" |
43
|
|
|
print(result) |
44
|
|
|
return [result, "Actually I am"] |
45
|
|
|
|
46
|
|
|
|
47
|
|
|
class HelloServer: |
48
|
|
|
def __init__(self, endpoint, name, model_filepath): |
49
|
|
|
self.server = Server() |
50
|
|
|
|
51
|
|
|
# This need to be imported at the start or else it will overwrite the data |
52
|
|
|
self.server.import_xml(model_filepath) |
53
|
|
|
|
54
|
|
|
self.server.set_endpoint(endpoint) |
55
|
|
|
self.server.set_server_name(name) |
56
|
|
|
|
57
|
|
|
objects = self.server.nodes.objects |
58
|
|
|
|
59
|
|
|
freeopcua_namespace = self.server.get_namespace_index("urn:freeopcua:python:server") |
60
|
|
|
hellower = objects.get_child("0:Hellower") |
61
|
|
|
hellower_say_hello = hellower.get_child("0:SayHello") |
62
|
|
|
|
63
|
|
|
self.server.link_method(hellower_say_hello, say_hello_xml) |
64
|
|
|
|
65
|
|
|
hellower.add_method( |
66
|
|
|
freeopcua_namespace, "SayHello2", say_hello, [ua.VariantType.Boolean], [ua.VariantType.String]) |
67
|
|
|
|
68
|
|
|
hellower.add_method( |
69
|
|
|
freeopcua_namespace, "SayHelloArray", say_hello_array, [ua.VariantType.Boolean], [ua.VariantType.String]) |
70
|
|
|
|
71
|
|
|
def __enter__(self): |
72
|
|
|
self.server.start() |
73
|
|
|
return self.server |
74
|
|
|
|
75
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
76
|
|
|
self.server.stop() |
77
|
|
|
|
78
|
|
|
|
79
|
|
|
if __name__ == '__main__': |
80
|
|
|
script_dir = os.path.dirname(__file__) |
81
|
|
|
with HelloServer( |
82
|
|
|
"opc.tcp://0.0.0.0:40840/freeopcua/server/", |
83
|
|
|
"FreeOpcUa Example Server", |
84
|
|
|
os.path.join(script_dir, "test_saying.xml")) as server: |
85
|
|
|
embed() |
86
|
|
|
|