Completed
Pull Request — master (#182)
by
unknown
02:56
created

server-with-encryption   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 55
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 37
dl 0
loc 55
rs 10
c 0
b 0
f 0
wmc 2

1 Function

Rating   Name   Duplication   Size   Complexity  
A main() 0 37 2
1
import asyncio
2
import sys
3
import logging
4
sys.path.insert(0, "..")
5
from asyncua.crypto.certificate_handler import CertificateHandler
6
from asyncua import Server
7
from asyncua import ua
8
9
logging.basicConfig(level=logging.INFO)
10
11
12
async def main():
13
    server = Server()
14
    cert_handler = CertificateHandler()
15
    await server.init()
16
    await cert_handler.trust_certificate("certificate-example.der")
17
18
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
19
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt],
20
                               certificate_handler=cert_handler)
21
    # load server certificate and private key. This enables endpoints
22
    # with signing and encryption.
23
    await server.load_certificate("my_cert.der")
24
    await server.load_private_key("my_private_key.pem")
25
26
    # setup our own namespace, not really necessary but should as spec
27
    uri = "http://examples.freeopcua.github.io"
28
    idx = await server.register_namespace(uri)
29
30
    # get Objects node, this is where we should put our custom stuff
31
    objects = server.get_objects_node()
32
33
    # populating our address space
34
    myobj = await objects.add_object(idx, "MyObject")
35
    myvar = await myobj.add_variable(idx, "MyVariable", 6.7)
36
    await myvar.set_writable()  # Set MyVariable to be writable by clients
37
38
    # starting!
39
    await server.start()
40
    try:
41
        count = 0
42
        while True:
43
            await asyncio.sleep(0.1)
44
            count += 0.1
45
            await myvar.write_value(count)
46
    finally:
47
        # close connection, remove subcsriptions, etc
48
        await server.stop()
49
50
51
if __name__ == "__main__":
52
    loop = asyncio.get_event_loop()
53
    loop.set_debug(True)
54
    loop.run_until_complete(main())
55
    # setup our server
56