Completed
Pull Request — master (#184)
by Olivier
03:02
created

server-events.start_server()   A

Complexity

Conditions 1

Size

Total Lines 26
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 19
nop 1
dl 0
loc 26
rs 9.45
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from asyncua import ua
4
from asyncua.server import Server, EventGenerator
5
6
logging.basicConfig(level=logging.INFO)
7
_logger = logging.getLogger('asyncua')
8
9
10
async def main():
11
    server = Server()
12
    await server.init()
13
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
14
    # setup our own namespace, not really necessary but should as spec
15
    uri = "http://examples.freeopcua.github.io"
16
    idx = await server.register_namespace(uri)
17
    # populating our address space
18
    myobj = await server.nodes.objects.add_object(idx, "MyObject")
19
20
    # Creating a custom event: Approach 1
21
    # The custom event object automatically will have members from its parent (BaseEventType)
22
    etype = await server.create_custom_event_type(
23
        idx, 'MyFirstEvent', ua.ObjectIds.BaseEventType,
24
        [('MyNumericProperty', ua.VariantType.Float),
25
         ('MyStringProperty', ua.VariantType.String)]
26
    )
27
    myevgen = await server.get_event_generator(etype, myobj)
28
29
    # Creating a custom event: Approach 2
30
    custom_etype = await server.nodes.base_event_type.add_object_type(2, 'MySecondEvent')
31
    await custom_etype.add_property(2, 'MyIntProperty', ua.Variant(0, ua.VariantType.Int32))
32
    await custom_etype.add_property(2, 'MyBoolProperty', ua.Variant(True, ua.VariantType.Boolean))
33
    mysecondevgen = await server.get_event_generator(custom_etype, myobj)
34
35
36
    async with server:
37
        count = 0
38
        while True:
39
            await asyncio.sleep(1)
40
            myevgen.event.Message = ua.LocalizedText("MyFirstEvent %d" % count)
41
            myevgen.event.Severity = count
42
            myevgen.event.MyNumericProperty = count
43
            myevgen.event.MyStringProperty = "Property %d" % count
44
            myevgen.trigger()
45
46
            mysecondevgen.trigger(message="MySecondEvent %d" % count)
47
48
            count += 1
49
50
51
if __name__ == "__main__":
52
    asyncio.run(main())
53