Passed
Push — master ( 706a1c...31db04 )
by Olivier
04:38 queued 01:40
created

server-datavalue-history   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 58
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 58
rs 10
c 0
b 0
f 0
wmc 2

1 Function

Rating   Name   Duplication   Size   Complexity  
A main() 0 42 2
1
import asyncio
2
import sys
3
sys.path.insert(0, "..")
4
import math
5
6
7
from asyncua import ua, Server
8
from asyncua.server.history_sql import HistorySQLite
9
10
11
async def main():
12
13
    # setup our server
14
    server = Server()
15
    
16
    # Configure server to use sqlite as history database (default is a simple memory dict)
17
    server.iserver.history_manager.set_storage(HistorySQLite("my_datavalue_history.sql"))
18
    
19
    # initialize server 
20
    await server.init()
21
22
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
23
24
    # setup our own namespace, not really necessary but should as spec
25
    uri = "http://examples.freeopcua.github.io"
26
    idx = await server.register_namespace(uri)
27
28
    # get Objects node, this is where we should put our custom stuff
29
    objects = server.get_objects_node()
30
31
    # populating our address space
32
    myobj = await objects.add_object(idx, "MyObject")
33
    myvar = await myobj.add_variable(idx, "MyVariable", ua.Variant(0, ua.VariantType.Double))
34
    await myvar.set_writable()  # Set MyVariable to be writable by clients
35
    print(myvar)
36
37
    # starting!
38
    await server.start()
39
40
    # enable data change history for this particular node, must be called after start since it uses subscription
41
    await server.historize_node_data_change(myvar, period=None, count=100)
42
43
    try:
44
        count = 0
45
        while True:
46
            await asyncio.sleep(1)
47
            count += 0.1
48
            await myvar.set_value(math.sin(count))
49
50
    finally:
51
        # close connection, remove subscriptions, etc
52
        await server.stop()
53
54
if __name__ == "__main__":
55
    loop = asyncio.get_event_loop()
56
    loop.set_debug(True)
57
    loop.run_until_complete(main())
58