Completed
Pull Request — master (#70)
by Olivier
02:19
created

opcua.Server.set_application_uri()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 9
ccs 2
cts 2
cp 1
rs 9.6667
cc 1
crap 1
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.binary_server_asyncio import BinaryServer
15 1
from opcua.internal_server import InternalServer
16 1
from opcua import Node, Subscription, ObjectIds, Event
17 1
from opcua import xmlimporter
18 1
from opcua import Client
19
20
21 1
class Server(object):
22
23
    """
24
    High level Server class
25
    Create an opcua server with default values
26
    The class is very short. Users are adviced to read the code.
27
    Create your own namespace and then populate your server address space
28
    using use the get_root() or get_objects() to get Node objects.
29
    and get_event_object() to fire events.
30
    Then start server. See example_server.py
31
    All methods are threadsafe
32
33
34
    :ivar application_uri:
35
    :vartype application_uri: uri
36
    :ivar product_uri:
37
    :vartype product_uri: uri
38
    :ivar name:
39
    :vartype name: string
40
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
41
    :vartype default_timeout: int
42
    :ivar iserver: internal server object
43
    :vartype default_timeout: InternalServer
44
    :ivar bserver: binary protocol server
45
    :vartype bserver: BinaryServer
46
47
    """
48
49 1
    def __init__(self):
50 1
        self.logger = logging.getLogger(__name__)
51 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4841/freeopcua/server/")
52 1
        self.application_uri = "urn:freeopcua:python:server"
53 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
54 1
        self.name = "FreeOpcUa Python Server"
55 1
        self.application_type = ua.ApplicationType.ClientAndServer
56 1
        self.default_timeout = 3600000
57 1
        self.iserver = InternalServer()
58 1
        self.bserver = None
59 1
        self._discovery_client = None
60 1
        self._discovery_period = 60
61
62
        # setup some expected values
63 1
        self.register_namespace(self.application_uri)
64 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
65 1
        sa_node.set_value([self.application_uri])
66
67 1
    def set_application_uri(self, uri):
68
        """
69
        Set application/server URI.
70
        This uri is supposed to be unique. If you intent to register
71
        your server to a discovery server, it really should be unique in
72
        your system!
73
        default is : "urn:freeopcua:python:server"
74
        """
75 1
        self.application_uri = uri
76
77 1
    def find_servers(self, uris=[]):
78
        """
79
        find_servers. mainly implemented for simmetry with client
80
        """
81 1
        params = ua.FindServersParameters()
82 1
        params.EndpointUrl = self.endpoint.geturl()
83 1
        params.ServerUris = uris
84 1
        return self.iserver.find_servers(params)
85
86 1
    def register_to_discovery(self, url, period=60):
87
        """
88
        Register to a OPC-UA Discovery server. Registering must be renewed at
89
        least every 10 minutes, so this method will use our asyncio thread to
90
        re-register every period seconds
91
        """
92 1
        self._discovery_period = period
93 1
        self._discovery_client = Client(url)
94 1
        self._discovery_client.connect()
95 1
        self.iserver.loop.call_soon(self._renew_registration)
96
97 1
    def _renew_registration(self):
98 1
        if self._discovery_client:
99 1
            self._discovery_client.register_server(self)
100 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
101
102 1
    def allow_remote_admin(self, allow):
103
        """
104
        Enable or disable the builtin Admin user from network clients
105
        """
106
        self.iserver.allow_remote_admin = allow
107
108 1
    def set_endpoint(self, url):
109 1
        self.endpoint = urlparse(url)
110
111 1
    def get_endpoints(self):
112 1
        return self.iserver.get_endpoints()
113
114 1
    def _setup_server_nodes(self):
115
        # to be called just before starting server since it needs all parameters to be setup
116 1
        self._set_endpoints()
117
118 1
    def _set_endpoints(self):
119 1
        idtoken = ua.UserTokenPolicy()
120 1
        idtoken.PolicyId = 'anonymous'
121 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
122
123 1
        idtoken2 = ua.UserTokenPolicy()
124 1
        idtoken2.PolicyId = 'certificate_basic256'
125 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
126
127 1
        idtoken3 = ua.UserTokenPolicy()
128 1
        idtoken3.PolicyId = 'certificate_basic128'
129 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
130
131 1
        idtoken4 = ua.UserTokenPolicy()
132 1
        idtoken4.PolicyId = 'username'
133 1
        idtoken4.TokenType = ua.UserTokenType.UserName
134
135 1
        appdesc = ua.ApplicationDescription()
136 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
137 1
        appdesc.ApplicationUri = self.application_uri
138 1
        appdesc.ApplicationType = self.application_type
139 1
        appdesc.ProductUri = self.product_uri
140 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
141
142 1
        edp = ua.EndpointDescription()
143 1
        edp.EndpointUrl = self.endpoint.geturl()
144 1
        edp.Server = appdesc
145 1
        edp.SecurityMode = ua.MessageSecurityMode.None_
146 1
        edp.SecurityPolicyUri = 'http://opcfoundation.org/UA/SecurityPolicy#None'
147 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
148 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
149 1
        edp.SecurityLevel = 0
150 1
        self.iserver.add_endpoint(edp)
151
152 1
    def set_server_name(self, name):
153
        self.name = name
154
155 1
    def start(self):
156
        """
157
        Start to listen on network
158
        """
159 1
        self.iserver.start()
160 1
        self._setup_server_nodes()
161 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
162 1
        self.bserver.start()
163
164 1
    def stop(self):
165
        """
166
        Stop server
167
        """
168 1
        if self._discovery_client:
169 1
            self._discovery_client.disconnect()
170 1
        self.bserver.stop()
171 1
        self.iserver.stop()
172
173 1
    def get_root_node(self):
174
        """
175
        Get Root node of server. Returns a Node object.
176
        """
177 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.RootFolder))
178
179 1
    def get_objects_node(self):
180
        """
181
        Get Objects node of server. Returns a Node object.
182
        """
183 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.ObjectsFolder))
184
185 1
    def get_server_node(self):
186
        """
187
        Get Server node of server. Returns a Node object.
188
        """
189 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.Server))
190
191 1
    def get_node(self, nodeid):
192
        """
193
        Get a specific node using NodeId object or a string representing a NodeId
194
        """
195 1
        return Node(self.iserver.isession, nodeid)
196
197 1
    def create_subscription(self, period, handler):
198
        """
199
        Create a subscription.
200
        returns a Subscription object which allow
201
        to subscribe to events or data on server
202
        """
203 1
        params = ua.CreateSubscriptionParameters()
204 1
        params.RequestedPublishingInterval = period
205 1
        params.RequestedLifetimeCount = 3000
206 1
        params.RequestedMaxKeepAliveCount = 10000
207 1
        params.MaxNotificationsPerPublish = 0
208 1
        params.PublishingEnabled = True
209 1
        params.Priority = 0
210 1
        return Subscription(self.iserver.isession, params, handler)
211
212 1
    def get_namespace_array(self):
213
        """
214
        get all namespace defined in server
215
        """
216 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
217 1
        return ns_node.get_value()
218
219 1
    def register_namespace(self, uri):
220
        """
221
        Register a new namespace. Nodes should in custom namespace, not 0.
222
        """
223 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
224 1
        uries = ns_node.get_value()
225 1
        uries.append(uri)
226 1
        ns_node.set_value(uries)
227 1
        return (len(uries) - 1)
228
229 1
    def get_namespace_index(self, uri):
230
        """
231
        get index of a namespace using its uri
232
        """
233 1
        uries = self.get_namespace_array()
234 1
        return uries.index(uri)
235
236 1
    def get_event_object(self, etype=ObjectIds.BaseEventType, source=ObjectIds.Server):
237
        """
238
        Returns an event object using an event type from address space.
239
        Use this object to fire events
240
        """
241
        return Event(self.iserver.isession, etype, source)
242
243 1
    def import_xml(self, path):
244
        """
245
        import nodes defined in xml
246
        """
247 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
248
        importer.import_xml(path)
249