Total Complexity | 42 |
Total Lines | 276 |
Duplicated Lines | 0 % |
Complex classes like opcua.client.UaClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
181 | class UaClient(object): |
||
182 | |||
183 | """ |
||
184 | low level OPC-UA client. |
||
185 | implement all(well..one day) methods defined in opcua spec |
||
186 | taking in argument the structures defined in opcua spec |
||
187 | in python most of the structures are defined in |
||
188 | uaprotocol_auto.py and uaprotocol_hand.py |
||
189 | """ |
||
190 | |||
191 | def __init__(self, timeout=1): |
||
192 | self.logger = logging.getLogger(__name__) |
||
193 | # _publishcallbacks should be accessed in recv thread only |
||
194 | self._publishcallbacks = {} |
||
195 | self._timeout = timeout |
||
196 | self._uasocket = None |
||
197 | self._security_policy = ua.SecurityPolicy() |
||
198 | |||
199 | def set_security(self, policy): |
||
200 | self._security_policy = policy |
||
201 | |||
202 | def connect_socket(self, host, port): |
||
203 | """ |
||
204 | connect to server socket and start receiving thread |
||
205 | """ |
||
206 | self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy) |
||
207 | return self._uasocket.connect_socket(host, port) |
||
208 | |||
209 | def disconnect_socket(self): |
||
210 | return self._uasocket.disconnect_socket() |
||
211 | |||
212 | def send_hello(self, url): |
||
213 | return self._uasocket.send_hello(url) |
||
214 | |||
215 | def open_secure_channel(self, params): |
||
216 | return self._uasocket.open_secure_channel(params) |
||
217 | |||
218 | def close_secure_channel(self): |
||
219 | """ |
||
220 | close secure channel. It seems to trigger a shutdown of socket |
||
221 | in most servers, so be prepare to reconnect |
||
222 | """ |
||
223 | return self._uasocket.close_secure_channel() |
||
224 | |||
225 | def create_session(self, parameters): |
||
226 | self.logger.info("create_session") |
||
227 | request = ua.CreateSessionRequest() |
||
228 | request.Parameters = parameters |
||
229 | data = self._uasocket.send_request(request) |
||
230 | response = ua.CreateSessionResponse.from_binary(data) |
||
231 | response.ResponseHeader.ServiceResult.check() |
||
232 | self._uasocket.authentication_token = response.Parameters.AuthenticationToken |
||
233 | return response.Parameters |
||
234 | |||
235 | def activate_session(self, parameters): |
||
236 | self.logger.info("activate_session") |
||
237 | request = ua.ActivateSessionRequest() |
||
238 | request.Parameters = parameters |
||
239 | data = self._uasocket.send_request(request) |
||
240 | response = ua.ActivateSessionResponse.from_binary(data) |
||
241 | response.ResponseHeader.ServiceResult.check() |
||
242 | return response.Parameters |
||
243 | |||
244 | def close_session(self, deletesubscriptions): |
||
245 | self.logger.info("close_session") |
||
246 | request = ua.CloseSessionRequest() |
||
247 | request.DeleteSubscriptions = deletesubscriptions |
||
248 | data = self._uasocket.send_request(request) |
||
249 | ua.CloseSessionResponse.from_binary(data) |
||
250 | # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent??? |
||
251 | |||
252 | def browse(self, parameters): |
||
253 | self.logger.info("browse") |
||
254 | request = ua.BrowseRequest() |
||
255 | request.Parameters = parameters |
||
256 | data = self._uasocket.send_request(request) |
||
257 | response = ua.BrowseResponse.from_binary(data) |
||
258 | response.ResponseHeader.ServiceResult.check() |
||
259 | return response.Results |
||
260 | |||
261 | def read(self, parameters): |
||
262 | self.logger.info("read") |
||
263 | request = ua.ReadRequest() |
||
264 | request.Parameters = parameters |
||
265 | data = self._uasocket.send_request(request) |
||
266 | response = ua.ReadResponse.from_binary(data) |
||
267 | response.ResponseHeader.ServiceResult.check() |
||
268 | # cast to Enum attributes that need to |
||
269 | for idx, rv in enumerate(parameters.NodesToRead): |
||
270 | if rv.AttributeId == ua.AttributeIds.NodeClass: |
||
271 | dv = response.Results[idx] |
||
272 | if dv.StatusCode.is_good(): |
||
273 | dv.Value.Value = ua.NodeClass(dv.Value.Value) |
||
274 | elif rv.AttributeId == ua.AttributeIds.ValueRank: |
||
275 | dv = response.Results[idx] |
||
276 | if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4): |
||
277 | dv.Value.Value = ua.ValueRank(dv.Value.Value) |
||
278 | return response.Results |
||
279 | |||
280 | def write(self, params): |
||
281 | self.logger.info("read") |
||
282 | request = ua.WriteRequest() |
||
283 | request.Parameters = params |
||
284 | data = self._uasocket.send_request(request) |
||
285 | response = ua.WriteResponse.from_binary(data) |
||
286 | response.ResponseHeader.ServiceResult.check() |
||
287 | return response.Results |
||
288 | |||
289 | def get_endpoints(self, params): |
||
290 | self.logger.info("get_endpoint") |
||
291 | request = ua.GetEndpointsRequest() |
||
292 | request.Parameters = params |
||
293 | data = self._uasocket.send_request(request) |
||
294 | response = ua.GetEndpointsResponse.from_binary(data) |
||
295 | response.ResponseHeader.ServiceResult.check() |
||
296 | return response.Endpoints |
||
297 | |||
298 | def find_servers(self, params): |
||
299 | self.logger.info("find_servers") |
||
300 | request = ua.FindServersRequest() |
||
301 | request.Parameters = params |
||
302 | data = self._uasocket.send_request(request) |
||
303 | response = ua.FindServersResponse.from_binary(data) |
||
304 | response.ResponseHeader.ServiceResult.check() |
||
305 | return response.Servers |
||
306 | |||
307 | def find_servers_on_network(self, params): |
||
308 | self.logger.info("find_servers_on_network") |
||
309 | request = ua.FindServersOnNetworkRequest() |
||
310 | request.Parameters = params |
||
311 | data = self._uasocket.send_request(request) |
||
312 | response = ua.FindServersOnNetworkResponse.from_binary(data) |
||
313 | response.ResponseHeader.ServiceResult.check() |
||
314 | return response.Parameters |
||
315 | |||
316 | def register_server(self, registered_server): |
||
317 | self.logger.info("register_server") |
||
318 | request = ua.RegisterServerRequest() |
||
319 | request.Server = registered_server |
||
320 | data = self._uasocket.send_request(request) |
||
321 | response = ua.RegisterServerResponse.from_binary(data) |
||
322 | response.ResponseHeader.ServiceResult.check() |
||
323 | # nothing to return for this service |
||
324 | |||
325 | def register_server2(self, params): |
||
326 | self.logger.info("register_server2") |
||
327 | request = ua.RegisterServer2Request() |
||
328 | request.Parameters = params |
||
329 | data = self._uasocket.send_request(request) |
||
330 | response = ua.RegisterServer2Response.from_binary(data) |
||
331 | response.ResponseHeader.ServiceResult.check() |
||
332 | return response.ConfigurationResults |
||
333 | |||
334 | def translate_browsepaths_to_nodeids(self, browsepaths): |
||
335 | self.logger.info("translate_browsepath_to_nodeid") |
||
336 | request = ua.TranslateBrowsePathsToNodeIdsRequest() |
||
337 | request.Parameters.BrowsePaths = browsepaths |
||
338 | data = self._uasocket.send_request(request) |
||
339 | response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data) |
||
340 | response.ResponseHeader.ServiceResult.check() |
||
341 | return response.Results |
||
342 | |||
343 | def create_subscription(self, params, callback): |
||
344 | self.logger.info("create_subscription") |
||
345 | request = ua.CreateSubscriptionRequest() |
||
346 | request.Parameters = params |
||
347 | resp_fut = Future() |
||
348 | mycallbak = partial(self._create_subscription_callback, callback, resp_fut) |
||
349 | self._uasocket.send_request(request, mycallbak) |
||
350 | return resp_fut.result(self._timeout) |
||
351 | |||
352 | def _create_subscription_callback(self, pub_callback, resp_fut, data_fut): |
||
353 | self.logger.info("_create_subscription_callback") |
||
354 | data = data_fut.result() |
||
355 | response = ua.CreateSubscriptionResponse.from_binary(data) |
||
356 | response.ResponseHeader.ServiceResult.check() |
||
357 | self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback |
||
358 | resp_fut.set_result(response.Parameters) |
||
359 | |||
360 | def delete_subscriptions(self, subscriptionids): |
||
361 | self.logger.info("delete_subscription") |
||
362 | request = ua.DeleteSubscriptionsRequest() |
||
363 | request.Parameters.SubscriptionIds = subscriptionids |
||
364 | resp_fut = Future() |
||
365 | mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut) |
||
366 | self._uasocket.send_request(request, mycallbak) |
||
367 | return resp_fut.result(self._timeout) |
||
368 | |||
369 | def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut): |
||
370 | self.logger.info("_delete_subscriptions_callback") |
||
371 | data = data_fut.result() |
||
372 | response = ua.DeleteSubscriptionsResponse.from_binary(data) |
||
373 | response.ResponseHeader.ServiceResult.check() |
||
374 | for sid in subscriptionids: |
||
375 | self._publishcallbacks.pop(sid) |
||
376 | resp_fut.set_result(response.Results) |
||
377 | |||
378 | def publish(self, acks=None): |
||
379 | self.logger.info("publish") |
||
380 | if acks is None: |
||
381 | acks = [] |
||
382 | request = ua.PublishRequest() |
||
383 | request.Parameters.SubscriptionAcknowledgements = acks |
||
384 | self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # timeout could be set to 0 but some servers to not support it |
||
385 | |||
386 | def _call_publish_callback(self, future): |
||
387 | self.logger.info("call_publish_callback") |
||
388 | data = future.result() |
||
389 | self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response") |
||
390 | try: |
||
391 | response = ua.PublishResponse.from_binary(data) |
||
392 | except Exception: |
||
393 | self.logger.exception("Error parsing notificatipn from server") |
||
394 | self.publish([]) #send publish request ot server so he does stop sending notifications |
||
395 | return |
||
396 | if response.Parameters.SubscriptionId not in self._publishcallbacks: |
||
397 | self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId) |
||
398 | return |
||
399 | callback = self._publishcallbacks[response.Parameters.SubscriptionId] |
||
400 | try: |
||
401 | callback(response.Parameters) |
||
402 | except Exception: # we call client code, catch everything! |
||
403 | self.logger.exception("Exception while calling user callback: %s") |
||
404 | |||
405 | def create_monitored_items(self, params): |
||
406 | self.logger.info("create_monitored_items") |
||
407 | request = ua.CreateMonitoredItemsRequest() |
||
408 | request.Parameters = params |
||
409 | data = self._uasocket.send_request(request) |
||
410 | response = ua.CreateMonitoredItemsResponse.from_binary(data) |
||
411 | response.ResponseHeader.ServiceResult.check() |
||
412 | return response.Results |
||
413 | |||
414 | def delete_monitored_items(self, params): |
||
415 | self.logger.info("delete_monitored_items") |
||
416 | request = ua.DeleteMonitoredItemsRequest() |
||
417 | request.Parameters = params |
||
418 | data = self._uasocket.send_request(request) |
||
419 | response = ua.DeleteMonitoredItemsResponse.from_binary(data) |
||
420 | response.ResponseHeader.ServiceResult.check() |
||
421 | return response.Results |
||
422 | |||
423 | def add_nodes(self, nodestoadd): |
||
424 | self.logger.info("add_nodes") |
||
425 | request = ua.AddNodesRequest() |
||
426 | request.Parameters.NodesToAdd = nodestoadd |
||
427 | data = self._uasocket.send_request(request) |
||
428 | response = ua.AddNodesResponse.from_binary(data) |
||
429 | response.ResponseHeader.ServiceResult.check() |
||
430 | return response.Results |
||
431 | |||
432 | def delete_nodes(self, nodestodelete): |
||
433 | self.logger.info("delete_nodes") |
||
434 | request = ua.DeleteNodesRequest() |
||
435 | request.Parameters.NodesToDelete = nodestodelete |
||
436 | data = self._uasocket.send_request(request) |
||
437 | response = ua.DeleteNodesResponse.from_binary(data) |
||
438 | response.ResponseHeader.ServiceResult.check() |
||
439 | return response.Results |
||
440 | |||
441 | def call(self, methodstocall): |
||
442 | request = ua.CallRequest() |
||
443 | request.Parameters.MethodsToCall = methodstocall |
||
444 | data = self._uasocket.send_request(request) |
||
445 | response = ua.CallResponse.from_binary(data) |
||
446 | response.ResponseHeader.ServiceResult.check() |
||
447 | return response.Results |
||
448 | |||
449 | def history_read(self, params): |
||
450 | self.logger.info("history_read") |
||
451 | request = ua.HistoryReadRequest() |
||
452 | request.Parameters = params |
||
453 | data = self._uasocket.send_request(request) |
||
454 | response = ua.HistoryReadResponse.from_binary(data) |
||
455 | response.ResponseHeader.ServiceResult.check() |
||
456 | return response.Results |
||
457 |