Issues (20)

examples/sync/server-example.py (1 issue)

1
from threading import Thread
2
import copy
3
import logging
4
from datetime import datetime
5
import time
6
from math import sin
7
import sys
8
sys.path.insert(0, "../..")
9
10
try:
11
    from IPython import embed
12
except ImportError:
13
    import code
14
15
    def embed():
16
        myvars = globals()
17
        myvars.update(locals())
18
        shell = code.InteractiveConsole(myvars)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable code does not seem to be defined.
Loading history...
19
        shell.interact()
20
21
22
from asyncua import ua, uamethod
23
from asyncua.sync import Server, ThreadLoop
24
25
26
class SubHandler(object):
27
28
    """
29
    Subscription Handler. To receive events from server for a subscription
30
    """
31
32
    def datachange_notification(self, node, val, data):
33
        print("Python: New data change event", node, val)
34
35
    def event_notification(self, event):
36
        print("Python: New event", event)
37
38
39
# method to be exposed through server
40
41
def func(parent, variant):
42
    ret = False
43
    if variant.Value % 2 == 0:
44
        ret = True
45
    return [ua.Variant(ret, ua.VariantType.Boolean)]
46
47
48
# method to be exposed through server
49
# uses a decorator to automatically convert to and from variants
50
51
@uamethod
52
def multiply(parent, x, y):
53
    print("multiply method call with parameters: ", x, y)
54
    return x * y
55
56
57
class VarUpdater(Thread):
58
    def __init__(self, var):
59
        Thread.__init__(self)
60
        self._stopev = False
61
        self.var = var
62
63
    def stop(self):
64
        self._stopev = True
65
66
    def run(self):
67
        while not self._stopev:
68
            v = sin(time.time() / 10)
69
            self.var.write_value(v)
70
            time.sleep(0.1)
71
72
73
74
if __name__ == "__main__":
75
    # optional: setup logging
76
    logging.basicConfig(level=logging.WARN)
77
    #logger = logging.getLogger("opcua.address_space")
78
    # logger.setLevel(logging.DEBUG)
79
    #logger = logging.getLogger("opcua.internal_server")
80
    # logger.setLevel(logging.DEBUG)
81
    #logger = logging.getLogger("opcua.binary_server_asyncio")
82
    # logger.setLevel(logging.DEBUG)
83
    #logger = logging.getLogger("opcua.uaprocessor")
84
    # logger.setLevel(logging.DEBUG)
85
    with ThreadLoop() as tloop:
86
        # now setup our server
87
        server = Server(tloop=tloop)
88
        #server.disable_clock()
89
        #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
90
        server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
91
        server.set_server_name("FreeOpcUa Example Server")
92
        # set all possible endpoint policies for clients to connect through
93
        server.set_security_policy([
94
                    ua.SecurityPolicyType.NoSecurity,
95
                    ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
96
                    ua.SecurityPolicyType.Basic256Sha256_Sign])
97
98
        # setup our own namespace
99
        uri = "http://examples.freeopcua.github.io"
100
        idx = server.register_namespace(uri)
101
        print("IDX", idx)
102
103
        # create a new node type we can instantiate in our address space
104
        dev = server.nodes.base_object_type.add_object_type(idx, "MyDevice")
105
        dev.add_variable(idx, "sensor1", 1.0).set_modelling_rule(True)
106
        dev.add_property(idx, "device_id", "0340").set_modelling_rule(True)
107
        ctrl = dev.add_object(idx, "controller")
108
        ctrl.set_modelling_rule(True)
109
        ctrl.add_property(idx, "state", "Idle").set_modelling_rule(True)
110
111
        # populating our address space
112
113
        # First a folder to organise our nodes
114
        myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
115
        # instanciate one instance of our device
116
        mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
117
        mydevice_var = mydevice.get_child([f"{idx}:controller", f"{idx}:state"])  # get proxy to our device state variable
118
        # create directly some objects and variables
119
        myobj = server.nodes.objects.add_object(idx, "MyObject")
120
        myvar = myobj.add_variable(idx, "MyVariable", 6.7)
121
        mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
122
        myvar.set_writable()    # Set MyVariable to be writable by clients
123
        mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
124
        mystringvar.set_writable()    # Set MyVariable to be writable by clients
125
        mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
126
        mydtvar.set_writable()    # Set MyVariable to be writable by clients
127
        myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
128
        myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
129
        myprop = myobj.add_property(idx, "myproperty", "I am a property")
130
        mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
131
        multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
132
133
        # import some nodes from xml
134
        server.import_xml("custom_nodes.xml")
135
136
        # creating a default event object
137
        # The event object automatically will have members for all events properties
138
        # you probably want to create a custom event type, see other examples
139
        myevgen = server.get_event_generator()
140
        myevgen.event.Severity = 300
141
142
        # starting!
143
        with server:
144
            print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
145
            vup = VarUpdater(mysin)  # just  a stupide class update a variable
146
            vup.start()
147
148
            # enable following if you want to subscribe to nodes on server side
149
            #handler = SubHandler()
150
            #sub = server.create_subscription(500, handler)
151
            #handle = sub.subscribe_data_change(myvar)
152
            # trigger event, all subscribed clients wil receive it
153
            var = myarrayvar.read_value()  # return a ref to value in db server side! not a copy!
154
            var = copy.copy(var)  # WARNING: we need to copy before writting again otherwise no data change event will be generated
155
            var.append(9.3)
156
            myarrayvar.write_value(var)
157
            mydevice_var.write_value("Running")
158
            myevgen.trigger(message="This is BaseEvent")
159
            server.write_attribute_value(myvar.nodeid, ua.DataValue(9.9))  # Server side write method which is a but faster than using write
160
161
            embed()
162
            vup.stop()
163