Completed
Pull Request — master (#70)
by Olivier
03:30
created

opcua.Server.set_application_uri()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 9
ccs 2
cts 2
cp 1
rs 9.6667
cc 1
crap 1
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.binary_server_asyncio import BinaryServer
15 1
from opcua.internal_server import InternalServer
16 1
from opcua import Node, Subscription, ObjectIds, Event
17 1
from opcua import xmlimporter
18 1
from opcua import Client
19
20
21 1
class Server(object):
22
23
    """
24
    High level Server class
25
    Create an opcua server with default values
26
    The class is very short. Users are adviced to read the code.
27
    Create your own namespace and then populate your server address space
28
    using use the get_root() or get_objects() to get Node objects.
29
    and get_event_object() to fire events.
30
    Then start server. See example_server.py
31
    All methods are threadsafe
32
33
34
    :ivar application_uri:
35
    :vartype application_uri: uri
36
    :ivar product_uri:
37
    :vartype product_uri: uri
38
    :ivar name:
39
    :vartype name: string
40
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
41
    :vartype default_timeout: int
42
    :ivar iserver: internal server object
43
    :vartype default_timeout: InternalServer
44
    :ivar bserver: binary protocol server
45
    :vartype bserver: BinaryServer
46
47
    """
48
49 1
    def __init__(self):
50 1
        self.logger = logging.getLogger(__name__)
51 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4841/freeopcua/server/")
52 1
        self.application_uri = "urn:freeopcua:python:server"
53 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
54 1
        self.name = "FreeOpcUa Python Server"
55 1
        self.application_type = ua.ApplicationType.ClientAndServer
56 1
        self.default_timeout = 3600000
57 1
        self.iserver = InternalServer()
58 1
        self.bserver = None
59 1
        self._discovery_client = None
60 1
        self._discovery_period = 60
61
62
        # setup some expected values
63 1
        self.register_namespace(self.application_uri)
64 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
65 1
        sa_node.set_value([self.application_uri])
66
67
68 1
    def set_application_uri(self, uri):
69
        """
70
        Set application/server URI.
71
        This uri is supposed to be unique. If you intent to register
72
        your server to a discovery server, it really should be unique in
73
        your system!
74
        default is : "urn:freeopcua:python:server"
75
        """
76 1
        self.application_uri = uri
77
78 1
    def find_servers(self, uris=[]):
79
        """
80
        find_servers. mainly implemented for simmetry with client
81
        """
82 1
        params = ua.FindServersParameters()
83 1
        params.EndpointUrl = self.endpoint.geturl()
84 1
        params.ServerUris = uris
85 1
        return self.iserver.find_servers(params)
86
87 1
    def register_to_discovery(self, url, period=60):
88
        """
89
        Register to a OPC-UA Discovery server. Registering must be renewed at
90
        least every 10 minutes, so this method will use our asyncio thread to
91
        re-register every period seconds
92
        """
93 1
        self._discovery_period = period
94 1
        self._discovery_client = Client(url)
95 1
        self._discovery_client.connect()
96 1
        self.iserver.loop.call_soon(self._renew_registration)
97
98 1
    def _renew_registration(self):
99 1
        if self._discovery_client:
100 1
            self._discovery_client.register_server(self)
101 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
102
103 1
    def allow_remote_admin(self, allow):
104
        """
105
        Enable or disable the builtin Admin user from network clients
106
        """
107
        self.iserver.allow_remote_admin = allow
108
109 1
    def set_endpoint(self, url):
110 1
        self.endpoint = urlparse(url)
111
112 1
    def get_endpoints(self):
113 1
        return self.iserver.get_endpoints()
114
115 1
    def _setup_server_nodes(self):
116
        # to be called just before starting server since it needs all parameters to be setup
117 1
        self._set_endpoints()
118
119 1
    def _set_endpoints(self):
120 1
        idtoken = ua.UserTokenPolicy()
121 1
        idtoken.PolicyId = 'anonymous'
122 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
123
124 1
        idtoken2 = ua.UserTokenPolicy()
125 1
        idtoken2.PolicyId = 'certificate_basic256'
126 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
127
128 1
        idtoken3 = ua.UserTokenPolicy()
129 1
        idtoken3.PolicyId = 'certificate_basic128'
130 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
131
132 1
        idtoken4 = ua.UserTokenPolicy()
133 1
        idtoken4.PolicyId = 'username'
134 1
        idtoken4.TokenType = ua.UserTokenType.UserName
135
136 1
        appdesc = ua.ApplicationDescription()
137 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
138 1
        appdesc.ApplicationUri = self.application_uri
139 1
        appdesc.ApplicationType = self.application_type
140 1
        appdesc.ProductUri = self.product_uri
141 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
142
143 1
        edp = ua.EndpointDescription()
144 1
        edp.EndpointUrl = self.endpoint.geturl()
145 1
        edp.Server = appdesc
146 1
        edp.SecurityMode = ua.MessageSecurityMode.None_
147 1
        edp.SecurityPolicyUri = 'http://opcfoundation.org/UA/SecurityPolicy#None'
148 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
149 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
150 1
        edp.SecurityLevel = 0
151 1
        self.iserver.add_endpoint(edp)
152
153 1
    def set_server_name(self, name):
154
        self.name = name
155
156 1
    def start(self):
157
        """
158
        Start to listen on network
159
        """
160 1
        self.iserver.start()
161 1
        self._setup_server_nodes()
162 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
163 1
        self.bserver.start()
164
165 1
    def stop(self):
166
        """
167
        Stop server
168
        """
169 1
        if self._discovery_client:
170 1
            self._discovery_client.disconnect()
171 1
        self.bserver.stop()
172 1
        self.iserver.stop()
173
174 1
    def get_root_node(self):
175
        """
176
        Get Root node of server. Returns a Node object.
177
        """
178 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.RootFolder))
179
180 1
    def get_objects_node(self):
181
        """
182
        Get Objects node of server. Returns a Node object.
183
        """
184 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.ObjectsFolder))
185
186 1
    def get_server_node(self):
187
        """
188
        Get Server node of server. Returns a Node object.
189
        """
190 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.Server))
191
192 1
    def get_node(self, nodeid):
193
        """
194
        Get a specific node using NodeId object or a string representing a NodeId
195
        """
196 1
        return Node(self.iserver.isession, nodeid)
197
198 1
    def create_subscription(self, period, handler):
199
        """
200
        Create a subscription.
201
        returns a Subscription object which allow
202
        to subscribe to events or data on server
203
        """
204 1
        params = ua.CreateSubscriptionParameters()
205 1
        params.RequestedPublishingInterval = period
206 1
        params.RequestedLifetimeCount = 3000
207 1
        params.RequestedMaxKeepAliveCount = 10000
208 1
        params.MaxNotificationsPerPublish = 0
209 1
        params.PublishingEnabled = True
210 1
        params.Priority = 0
211 1
        return Subscription(self.iserver.isession, params, handler)
212
213 1
    def get_namespace_array(self):
214
        """
215
        get all namespace defined in server
216
        """
217 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
218 1
        return ns_node.get_value()
219
220 1
    def register_namespace(self, uri):
221
        """
222
        Register a new namespace. Nodes should in custom namespace, not 0.
223
        """
224 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
225 1
        uries = ns_node.get_value()
226 1
        uries.append(uri)
227 1
        ns_node.set_value(uries)
228 1
        return (len(uries) - 1)
229
230 1
    def get_namespace_index(self, uri):
231
        """
232
        get index of a namespace using its uri
233
        """
234 1
        uries = self.get_namespace_array()
235 1
        return uries.index(uri)
236
237 1
    def get_event_object(self, etype=ObjectIds.BaseEventType, source=ObjectIds.Server):
238
        """
239
        Returns an event object using an event type from address space.
240
        Use this object to fire events
241
        """
242
        return Event(self.iserver.isession, etype, source)
243
244 1
    def import_xml(self, path):
245
        """
246
        import nodes defined in xml
247
        """
248 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
249
        importer.import_xml(path)
250