Passed
Push — dev ( b98292 )
by Olivier
02:18
created

opcua.Server.start()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4286
cc 1
crap 1
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.binary_server_asyncio import BinaryServer
15 1
from opcua.internal_server import InternalServer
16 1
from opcua import Node, Subscription, ObjectIds, Event
17 1
from opcua import xmlimporter
18 1
from opcua import Client
19
20
21 1
class Server(object):
22
23
    """
24
    High level Server class
25
    Create an opcua server with default values
26
    The class is very short. Users are adviced to read the code.
27
    Create your own namespace and then populate your server address space
28
    using use the get_root() or get_objects() to get Node objects.
29
    and get_event_object() to fire events.
30
    Then start server. See example_server.py
31
    All methods are threadsafe
32
33
34
    :ivar application_uri:
35
    :vartype application_uri: uri
36
    :ivar product_uri:
37
    :vartype product_uri: uri
38
    :ivar name:
39
    :vartype name: string
40
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
41
    :vartype default_timeout: int
42
    :ivar iserver: internal server object
43
    :vartype default_timeout: InternalServer
44
    :ivar bserver: binary protocol server
45
    :vartype bserver: BinaryServer
46
47
    """
48
49 1
    def __init__(self):
50 1
        self.logger = logging.getLogger(__name__)
51 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4841/freeopcua/server/")
52 1
        self.application_uri = "urn:freeopcua:python:server"
53 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
54 1
        self.name = "FreeOpcUa Python Server"
55 1
        self.application_type = ua.ApplicationType.ClientAndServer
56 1
        self.default_timeout = 3600000
57 1
        self.iserver = InternalServer()
58 1
        self.bserver = None
59 1
        self._discovery_clients = {}
60 1
        self._discovery_period = 60
61
62
        # setup some expected values
63 1
        self.register_namespace(self.application_uri)
64 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
65 1
        sa_node.set_value([self.application_uri])
66
67 1
    def disable_clock(self, val=True):
68
        """
69
        for debugging you may want to disable clock that write every second
70
        to address space
71
        """
72
        self.iserver.disabled_clock = val
73
74 1
    def set_application_uri(self, uri):
75
        """
76
        Set application/server URI.
77
        This uri is supposed to be unique. If you intent to register
78
        your server to a discovery server, it really should be unique in
79
        your system!
80
        default is : "urn:freeopcua:python:server"
81
        """
82 1
        self.application_uri = uri
83
84 1
    def find_servers(self, uris=None):
85
        """
86
        find_servers. mainly implemented for simmetry with client
87
        """
88 1
        if uris is None:
89 1
            uris = []
90 1
        params = ua.FindServersParameters()
91 1
        params.EndpointUrl = self.endpoint.geturl()
92 1
        params.ServerUris = uris
93 1
        return self.iserver.find_servers(params)
94
95 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
96
        """
97
        Register to an OPC-UA Discovery server. Registering must be renewed at
98
        least every 10 minutes, so this method will use our asyncio thread to
99
        re-register every period seconds
100
        """
101 1
        if url in self._discovery_clients:
102 1
            self._discovery_clients[url].disconnect()
103 1
        self._discovery_clients[url] = Client(url)
104 1
        self._discovery_clients[url].connect()
105 1
        self._discovery_clients[url].register_server(self)
106 1
        self._discovery_period = period
107 1
        self.iserver.loop.call_soon(self._renew_registration)
108
109 1
    def _renew_registration(self):
110 1
        for client in self._discovery_clients.values():
111 1
            client.register_server(self)
112 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
113
114 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
115
        """
116
        Create a client to discovery server and return it
117
        """
118
        client = Client(url)
119
        client.connect()
120
        return client
121
122 1
    def allow_remote_admin(self, allow):
123
        """
124
        Enable or disable the builtin Admin user from network clients
125
        """
126
        self.iserver.allow_remote_admin = allow
127
128 1
    def set_endpoint(self, url):
129 1
        self.endpoint = urlparse(url)
130
131 1
    def get_endpoints(self):
132 1
        return self.iserver.get_endpoints()
133
134 1
    def _setup_server_nodes(self):
135
        # to be called just before starting server since it needs all parameters to be setup
136 1
        self._set_endpoints()
137
138 1
    def _set_endpoints(self):
139 1
        idtoken = ua.UserTokenPolicy()
140 1
        idtoken.PolicyId = 'anonymous'
141 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
142
143 1
        idtoken2 = ua.UserTokenPolicy()
144 1
        idtoken2.PolicyId = 'certificate_basic256'
145 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
146
147 1
        idtoken3 = ua.UserTokenPolicy()
148 1
        idtoken3.PolicyId = 'certificate_basic128'
149 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
150
151 1
        idtoken4 = ua.UserTokenPolicy()
152 1
        idtoken4.PolicyId = 'username'
153 1
        idtoken4.TokenType = ua.UserTokenType.UserName
154
155 1
        appdesc = ua.ApplicationDescription()
156 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
157 1
        appdesc.ApplicationUri = self.application_uri
158 1
        appdesc.ApplicationType = self.application_type
159 1
        appdesc.ProductUri = self.product_uri
160 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
161
162 1
        edp = ua.EndpointDescription()
163 1
        edp.EndpointUrl = self.endpoint.geturl()
164 1
        edp.Server = appdesc
165 1
        edp.SecurityMode = ua.MessageSecurityMode.None_
166 1
        edp.SecurityPolicyUri = 'http://opcfoundation.org/UA/SecurityPolicy#None'
167 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
168 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
169 1
        edp.SecurityLevel = 0
170 1
        self.iserver.add_endpoint(edp)
171
172 1
    def set_server_name(self, name):
173
        self.name = name
174
175 1
    def start(self):
176
        """
177
        Start to listen on network
178
        """
179 1
        self._setup_server_nodes()
180 1
        self.iserver.start()
181 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
182 1
        self.bserver.start()
183
184 1
    def stop(self):
185
        """
186
        Stop server
187
        """
188 1
        for client in self._discovery_clients.values():
189 1
            client.disconnect()
190 1
        self.bserver.stop()
191 1
        self.iserver.stop()
192
193 1
    def get_root_node(self):
194
        """
195
        Get Root node of server. Returns a Node object.
196
        """
197 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.RootFolder))
198
199 1
    def get_objects_node(self):
200
        """
201
        Get Objects node of server. Returns a Node object.
202
        """
203 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.ObjectsFolder))
204
205 1
    def get_server_node(self):
206
        """
207
        Get Server node of server. Returns a Node object.
208
        """
209 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.Server))
210
211 1
    def get_node(self, nodeid):
212
        """
213
        Get a specific node using NodeId object or a string representing a NodeId
214
        """
215 1
        return Node(self.iserver.isession, nodeid)
216
217 1
    def create_subscription(self, period, handler):
218
        """
219
        Create a subscription.
220
        returns a Subscription object which allow
221
        to subscribe to events or data on server
222
        """
223 1
        params = ua.CreateSubscriptionParameters()
224 1
        params.RequestedPublishingInterval = period
225 1
        params.RequestedLifetimeCount = 3000
226 1
        params.RequestedMaxKeepAliveCount = 10000
227 1
        params.MaxNotificationsPerPublish = 0
228 1
        params.PublishingEnabled = True
229 1
        params.Priority = 0
230 1
        return Subscription(self.iserver.isession, params, handler)
231
232 1
    def get_namespace_array(self):
233
        """
234
        get all namespace defined in server
235
        """
236 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
237 1
        return ns_node.get_value()
238
239 1
    def register_namespace(self, uri):
240
        """
241
        Register a new namespace. Nodes should in custom namespace, not 0.
242
        """
243 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
244 1
        uries = ns_node.get_value()
245 1
        uries.append(uri)
246 1
        ns_node.set_value(uries)
247 1
        return (len(uries) - 1)
248
249 1
    def get_namespace_index(self, uri):
250
        """
251
        get index of a namespace using its uri
252
        """
253 1
        uries = self.get_namespace_array()
254 1
        return uries.index(uri)
255
256 1
    def get_event_object(self, etype=ObjectIds.BaseEventType, source=ObjectIds.Server):
257
        """
258
        Returns an event object using an event type from address space.
259
        Use this object to fire events
260
        """
261
        return Event(self.iserver.isession, etype, source)
262
263 1
    def import_xml(self, path):
264
        """
265
        import nodes defined in xml
266
        """
267 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
268
        importer.import_xml(path)
269