1 | import os.path |
||
2 | try: |
||
3 | from IPython import embed |
||
4 | except ImportError: |
||
5 | import code |
||
6 | |||
7 | def embed(): |
||
8 | vars = globals() |
||
9 | vars.update(locals()) |
||
10 | shell = code.InteractiveConsole(vars) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
11 | shell.interact() |
||
12 | |||
13 | from asyncua import ua, uamethod, Server |
||
14 | |||
15 | |||
16 | @uamethod |
||
17 | def say_hello_xml(parent, happy): |
||
18 | print("Calling say_hello_xml") |
||
19 | if happy: |
||
20 | result = "I'm happy" |
||
21 | else: |
||
22 | result = "I'm not happy" |
||
23 | print(result) |
||
24 | return result |
||
25 | |||
26 | |||
27 | @uamethod |
||
28 | def say_hello(parent, happy): |
||
29 | if happy: |
||
30 | result = "I'm happy" |
||
31 | else: |
||
32 | result = "I'm not happy" |
||
33 | print(result) |
||
34 | return result |
||
35 | |||
36 | |||
37 | @uamethod |
||
38 | def say_hello_array(parent, happy): |
||
39 | if happy: |
||
40 | result = "I'm happy" |
||
41 | else: |
||
42 | result = "I'm not happy" |
||
43 | print(result) |
||
44 | return [result, "Actually I am"] |
||
45 | |||
46 | |||
47 | class HelloServer: |
||
48 | def __init__(self, endpoint, name, model_filepath): |
||
49 | self.server = Server() |
||
50 | |||
51 | # This need to be imported at the start or else it will overwrite the data |
||
52 | self.server.import_xml(model_filepath) |
||
53 | |||
54 | self.server.set_endpoint(endpoint) |
||
55 | self.server.set_server_name(name) |
||
56 | |||
57 | objects = self.server.nodes.objects |
||
58 | |||
59 | freeopcua_namespace = self.server.get_namespace_index("urn:freeopcua:python:server") |
||
60 | hellower = objects.get_child("0:Hellower") |
||
61 | hellower_say_hello = hellower.get_child("0:SayHello") |
||
62 | |||
63 | self.server.link_method(hellower_say_hello, say_hello_xml) |
||
64 | |||
65 | hellower.add_method( |
||
66 | freeopcua_namespace, "SayHello2", say_hello, [ua.VariantType.Boolean], [ua.VariantType.String]) |
||
67 | |||
68 | hellower.add_method( |
||
69 | freeopcua_namespace, "SayHelloArray", say_hello_array, [ua.VariantType.Boolean], [ua.VariantType.String]) |
||
70 | |||
71 | def __enter__(self): |
||
72 | self.server.start() |
||
73 | return self.server |
||
74 | |||
75 | def __exit__(self, exc_type, exc_val, exc_tb): |
||
76 | self.server.stop() |
||
77 | |||
78 | |||
79 | if __name__ == '__main__': |
||
80 | script_dir = os.path.dirname(__file__) |
||
81 | with HelloServer( |
||
82 | "opc.tcp://0.0.0.0:40840/freeopcua/server/", |
||
83 | "FreeOpcUa Example Server", |
||
84 | os.path.join(script_dir, "test_saying.xml")) as server: |
||
85 | embed() |
||
86 |