Completed
Pull Request — master (#70)
by Olivier
03:03
created

opcua.Server._set_endpoints()   B

Complexity

Conditions 1

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 1
Metric Value
dl 0
loc 33
ccs 28
cts 28
cp 1
rs 8.8571
cc 1
crap 1
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.binary_server_asyncio import BinaryServer
15 1
from opcua.internal_server import InternalServer
16 1
from opcua import Node, Subscription, ObjectIds, Event
17 1
from opcua import xmlimporter
18 1
from opcua import Client
19
20
21 1
class Server(object):
22
23
    """
24
    High level Server class
25
    Create an opcua server with default values
26
    The class is very short. Users are adviced to read the code.
27
    Create your own namespace and then populate your server address space
28
    using use the get_root() or get_objects() to get Node objects.
29
    and get_event_object() to fire events.
30
    Then start server. See example_server.py
31
    All methods are threadsafe
32
33
34
    :ivar application_uri:
35
    :vartype application_uri: uri
36
    :ivar product_uri:
37
    :vartype product_uri: uri
38
    :ivar name:
39
    :vartype name: string
40
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
41
    :vartype default_timeout: int
42
    :ivar iserver: internal server object
43
    :vartype default_timeout: InternalServer
44
    :ivar bserver: binary protocol server
45
    :vartype bserver: BinaryServer
46
47
    """
48
49 1
    def __init__(self):
50 1
        self.logger = logging.getLogger(__name__)
51 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4841/freeopcua/server/")
52 1
        self.application_uri = "urn:freeopcua:python:server"
53 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
54 1
        self.name = "FreeOpcUa Python Server"
55 1
        self.application_type = ua.ApplicationType.ClientAndServer
56 1
        self.default_timeout = 3600000
57 1
        self.iserver = InternalServer()
58 1
        self.bserver = None
59 1
        self._discovery_client = None
60 1
        self._discovery_period = 60
61
62
        # setup some expected values
63 1
        self.register_namespace(self.application_uri)
64 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
65 1
        sa_node.set_value([self.application_uri])
66
67 1
    def disable_clock(self, val=True):
68
        """
69
        for debugging you may want to disable clock that write every second
70
        to address space
71
        """
72
        self.iserver.disabled_clock = val
73
74 1
    def set_application_uri(self, uri):
75
        """
76
        Set application/server URI.
77
        This uri is supposed to be unique. If you intent to register
78
        your server to a discovery server, it really should be unique in
79
        your system!
80
        default is : "urn:freeopcua:python:server"
81
        """
82 1
        self.application_uri = uri
83
84 1
    def find_servers(self, uris=None):
85
        """
86
        find_servers. mainly implemented for simmetry with client
87
        """
88 1
        if uris is None:
89 1
            uris = []
90 1
        params = ua.FindServersParameters()
91 1
        params.EndpointUrl = self.endpoint.geturl()
92 1
        params.ServerUris = uris
93 1
        return self.iserver.find_servers(params)
94
95 1
    def register_to_discovery(self, url, period=60):
96
        """
97
        Register to a OPC-UA Discovery server. Registering must be renewed at
98
        least every 10 minutes, so this method will use our asyncio thread to
99
        re-register every period seconds
100
        """
101 1
        self._discovery_period = period
102 1
        self._discovery_client = Client(url)
103 1
        self._discovery_client.connect()
104 1
        self.iserver.loop.call_soon(self._renew_registration)
105
106 1
    def _renew_registration(self):
107 1
        if self._discovery_client:
108 1
            self._discovery_client.register_server(self)
109 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
110
111 1
    def allow_remote_admin(self, allow):
112
        """
113
        Enable or disable the builtin Admin user from network clients
114
        """
115
        self.iserver.allow_remote_admin = allow
116
117 1
    def set_endpoint(self, url):
118 1
        self.endpoint = urlparse(url)
119
120 1
    def get_endpoints(self):
121 1
        return self.iserver.get_endpoints()
122
123 1
    def _setup_server_nodes(self):
124
        # to be called just before starting server since it needs all parameters to be setup
125 1
        self._set_endpoints()
126
127 1
    def _set_endpoints(self):
128 1
        idtoken = ua.UserTokenPolicy()
129 1
        idtoken.PolicyId = 'anonymous'
130 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
131
132 1
        idtoken2 = ua.UserTokenPolicy()
133 1
        idtoken2.PolicyId = 'certificate_basic256'
134 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
135
136 1
        idtoken3 = ua.UserTokenPolicy()
137 1
        idtoken3.PolicyId = 'certificate_basic128'
138 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
139
140 1
        idtoken4 = ua.UserTokenPolicy()
141 1
        idtoken4.PolicyId = 'username'
142 1
        idtoken4.TokenType = ua.UserTokenType.UserName
143
144 1
        appdesc = ua.ApplicationDescription()
145 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
146 1
        appdesc.ApplicationUri = self.application_uri
147 1
        appdesc.ApplicationType = self.application_type
148 1
        appdesc.ProductUri = self.product_uri
149 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
150
151 1
        edp = ua.EndpointDescription()
152 1
        edp.EndpointUrl = self.endpoint.geturl()
153 1
        edp.Server = appdesc
154 1
        edp.SecurityMode = ua.MessageSecurityMode.None_
155 1
        edp.SecurityPolicyUri = 'http://opcfoundation.org/UA/SecurityPolicy#None'
156 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
157 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
158 1
        edp.SecurityLevel = 0
159 1
        self.iserver.add_endpoint(edp)
160
161 1
    def set_server_name(self, name):
162
        self.name = name
163
164 1
    def start(self):
165
        """
166
        Start to listen on network
167
        """
168 1
        self.iserver.start()
169 1
        self._setup_server_nodes()
170 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
171 1
        self.bserver.start()
172
173 1
    def stop(self):
174
        """
175
        Stop server
176
        """
177 1
        if self._discovery_client:
178 1
            self._discovery_client.disconnect()
179 1
        self.bserver.stop()
180 1
        self.iserver.stop()
181
182 1
    def get_root_node(self):
183
        """
184
        Get Root node of server. Returns a Node object.
185
        """
186 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.RootFolder))
187
188 1
    def get_objects_node(self):
189
        """
190
        Get Objects node of server. Returns a Node object.
191
        """
192 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.ObjectsFolder))
193
194 1
    def get_server_node(self):
195
        """
196
        Get Server node of server. Returns a Node object.
197
        """
198 1
        return self.get_node(ua.TwoByteNodeId(ObjectIds.Server))
199
200 1
    def get_node(self, nodeid):
201
        """
202
        Get a specific node using NodeId object or a string representing a NodeId
203
        """
204 1
        return Node(self.iserver.isession, nodeid)
205
206 1
    def create_subscription(self, period, handler):
207
        """
208
        Create a subscription.
209
        returns a Subscription object which allow
210
        to subscribe to events or data on server
211
        """
212 1
        params = ua.CreateSubscriptionParameters()
213 1
        params.RequestedPublishingInterval = period
214 1
        params.RequestedLifetimeCount = 3000
215 1
        params.RequestedMaxKeepAliveCount = 10000
216 1
        params.MaxNotificationsPerPublish = 0
217 1
        params.PublishingEnabled = True
218 1
        params.Priority = 0
219 1
        return Subscription(self.iserver.isession, params, handler)
220
221 1
    def get_namespace_array(self):
222
        """
223
        get all namespace defined in server
224
        """
225 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
226 1
        return ns_node.get_value()
227
228 1
    def register_namespace(self, uri):
229
        """
230
        Register a new namespace. Nodes should in custom namespace, not 0.
231
        """
232 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
233 1
        uries = ns_node.get_value()
234 1
        uries.append(uri)
235 1
        ns_node.set_value(uries)
236 1
        return (len(uries) - 1)
237
238 1
    def get_namespace_index(self, uri):
239
        """
240
        get index of a namespace using its uri
241
        """
242 1
        uries = self.get_namespace_array()
243 1
        return uries.index(uri)
244
245 1
    def get_event_object(self, etype=ObjectIds.BaseEventType, source=ObjectIds.Server):
246
        """
247
        Returns an event object using an event type from address space.
248
        Use this object to fire events
249
        """
250
        return Event(self.iserver.isession, etype, source)
251
252 1
    def import_xml(self, path):
253
        """
254
        import nodes defined in xml
255
        """
256 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
257
        importer.import_xml(path)
258