| Total Complexity | 42 |
| Total Lines | 276 |
| Duplicated Lines | 0 % |
Complex classes like opcua.client.UaClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """ |
||
| 181 | class UaClient(object): |
||
| 182 | |||
| 183 | """ |
||
| 184 | low level OPC-UA client. |
||
| 185 | implement all(well..one day) methods defined in opcua spec |
||
| 186 | taking in argument the structures defined in opcua spec |
||
| 187 | in python most of the structures are defined in |
||
| 188 | uaprotocol_auto.py and uaprotocol_hand.py |
||
| 189 | """ |
||
| 190 | |||
| 191 | def __init__(self, timeout=1): |
||
| 192 | self.logger = logging.getLogger(__name__) |
||
| 193 | # _publishcallbacks should be accessed in recv thread only |
||
| 194 | self._publishcallbacks = {} |
||
| 195 | self._timeout = timeout |
||
| 196 | self._uasocket = None |
||
| 197 | self._security_policy = ua.SecurityPolicy() |
||
| 198 | |||
| 199 | def set_security(self, policy): |
||
| 200 | self._security_policy = policy |
||
| 201 | |||
| 202 | def connect_socket(self, host, port): |
||
| 203 | """ |
||
| 204 | connect to server socket and start receiving thread |
||
| 205 | """ |
||
| 206 | self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy) |
||
| 207 | return self._uasocket.connect_socket(host, port) |
||
| 208 | |||
| 209 | def disconnect_socket(self): |
||
| 210 | return self._uasocket.disconnect_socket() |
||
| 211 | |||
| 212 | def send_hello(self, url): |
||
| 213 | return self._uasocket.send_hello(url) |
||
| 214 | |||
| 215 | def open_secure_channel(self, params): |
||
| 216 | return self._uasocket.open_secure_channel(params) |
||
| 217 | |||
| 218 | def close_secure_channel(self): |
||
| 219 | """ |
||
| 220 | close secure channel. It seems to trigger a shutdown of socket |
||
| 221 | in most servers, so be prepare to reconnect |
||
| 222 | """ |
||
| 223 | return self._uasocket.close_secure_channel() |
||
| 224 | |||
| 225 | def create_session(self, parameters): |
||
| 226 | self.logger.info("create_session") |
||
| 227 | request = ua.CreateSessionRequest() |
||
| 228 | request.Parameters = parameters |
||
| 229 | data = self._uasocket.send_request(request) |
||
| 230 | response = ua.CreateSessionResponse.from_binary(data) |
||
| 231 | response.ResponseHeader.ServiceResult.check() |
||
| 232 | self._uasocket.authentication_token = response.Parameters.AuthenticationToken |
||
| 233 | return response.Parameters |
||
| 234 | |||
| 235 | def activate_session(self, parameters): |
||
| 236 | self.logger.info("activate_session") |
||
| 237 | request = ua.ActivateSessionRequest() |
||
| 238 | request.Parameters = parameters |
||
| 239 | data = self._uasocket.send_request(request) |
||
| 240 | response = ua.ActivateSessionResponse.from_binary(data) |
||
| 241 | response.ResponseHeader.ServiceResult.check() |
||
| 242 | return response.Parameters |
||
| 243 | |||
| 244 | def close_session(self, deletesubscriptions): |
||
| 245 | self.logger.info("close_session") |
||
| 246 | request = ua.CloseSessionRequest() |
||
| 247 | request.DeleteSubscriptions = deletesubscriptions |
||
| 248 | data = self._uasocket.send_request(request) |
||
| 249 | ua.CloseSessionResponse.from_binary(data) |
||
| 250 | # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent??? |
||
| 251 | |||
| 252 | def browse(self, parameters): |
||
| 253 | self.logger.info("browse") |
||
| 254 | request = ua.BrowseRequest() |
||
| 255 | request.Parameters = parameters |
||
| 256 | data = self._uasocket.send_request(request) |
||
| 257 | response = ua.BrowseResponse.from_binary(data) |
||
| 258 | response.ResponseHeader.ServiceResult.check() |
||
| 259 | return response.Results |
||
| 260 | |||
| 261 | def read(self, parameters): |
||
| 262 | self.logger.info("read") |
||
| 263 | request = ua.ReadRequest() |
||
| 264 | request.Parameters = parameters |
||
| 265 | data = self._uasocket.send_request(request) |
||
| 266 | response = ua.ReadResponse.from_binary(data) |
||
| 267 | response.ResponseHeader.ServiceResult.check() |
||
| 268 | # cast to Enum attributes that need to |
||
| 269 | for idx, rv in enumerate(parameters.NodesToRead): |
||
| 270 | if rv.AttributeId == ua.AttributeIds.NodeClass: |
||
| 271 | dv = response.Results[idx] |
||
| 272 | if dv.StatusCode.is_good(): |
||
| 273 | dv.Value.Value = ua.NodeClass(dv.Value.Value) |
||
| 274 | elif rv.AttributeId == ua.AttributeIds.ValueRank: |
||
| 275 | dv = response.Results[idx] |
||
| 276 | if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4): |
||
| 277 | dv.Value.Value = ua.ValueRank(dv.Value.Value) |
||
| 278 | return response.Results |
||
| 279 | |||
| 280 | def write(self, params): |
||
| 281 | self.logger.info("read") |
||
| 282 | request = ua.WriteRequest() |
||
| 283 | request.Parameters = params |
||
| 284 | data = self._uasocket.send_request(request) |
||
| 285 | response = ua.WriteResponse.from_binary(data) |
||
| 286 | response.ResponseHeader.ServiceResult.check() |
||
| 287 | return response.Results |
||
| 288 | |||
| 289 | def get_endpoints(self, params): |
||
| 290 | self.logger.info("get_endpoint") |
||
| 291 | request = ua.GetEndpointsRequest() |
||
| 292 | request.Parameters = params |
||
| 293 | data = self._uasocket.send_request(request) |
||
| 294 | response = ua.GetEndpointsResponse.from_binary(data) |
||
| 295 | response.ResponseHeader.ServiceResult.check() |
||
| 296 | return response.Endpoints |
||
| 297 | |||
| 298 | def find_servers(self, params): |
||
| 299 | self.logger.info("find_servers") |
||
| 300 | request = ua.FindServersRequest() |
||
| 301 | request.Parameters = params |
||
| 302 | data = self._uasocket.send_request(request) |
||
| 303 | response = ua.FindServersResponse.from_binary(data) |
||
| 304 | response.ResponseHeader.ServiceResult.check() |
||
| 305 | return response.Servers |
||
| 306 | |||
| 307 | def find_servers_on_network(self, params): |
||
| 308 | self.logger.info("find_servers_on_network") |
||
| 309 | request = ua.FindServersOnNetworkRequest() |
||
| 310 | request.Parameters = params |
||
| 311 | data = self._uasocket.send_request(request) |
||
| 312 | response = ua.FindServersOnNetworkResponse.from_binary(data) |
||
| 313 | response.ResponseHeader.ServiceResult.check() |
||
| 314 | return response.Parameters |
||
| 315 | |||
| 316 | def register_server(self, registered_server): |
||
| 317 | self.logger.info("register_server") |
||
| 318 | request = ua.RegisterServerRequest() |
||
| 319 | request.Server = registered_server |
||
| 320 | data = self._uasocket.send_request(request) |
||
| 321 | response = ua.RegisterServerResponse.from_binary(data) |
||
| 322 | response.ResponseHeader.ServiceResult.check() |
||
| 323 | # nothing to return for this service |
||
| 324 | |||
| 325 | def register_server2(self, params): |
||
| 326 | self.logger.info("register_server2") |
||
| 327 | request = ua.RegisterServer2Request() |
||
| 328 | request.Parameters = params |
||
| 329 | data = self._uasocket.send_request(request) |
||
| 330 | response = ua.RegisterServer2Response.from_binary(data) |
||
| 331 | response.ResponseHeader.ServiceResult.check() |
||
| 332 | return response.ConfigurationResults |
||
| 333 | |||
| 334 | def translate_browsepaths_to_nodeids(self, browsepaths): |
||
| 335 | self.logger.info("translate_browsepath_to_nodeid") |
||
| 336 | request = ua.TranslateBrowsePathsToNodeIdsRequest() |
||
| 337 | request.Parameters.BrowsePaths = browsepaths |
||
| 338 | data = self._uasocket.send_request(request) |
||
| 339 | response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data) |
||
| 340 | response.ResponseHeader.ServiceResult.check() |
||
| 341 | return response.Results |
||
| 342 | |||
| 343 | def create_subscription(self, params, callback): |
||
| 344 | self.logger.info("create_subscription") |
||
| 345 | request = ua.CreateSubscriptionRequest() |
||
| 346 | request.Parameters = params |
||
| 347 | resp_fut = Future() |
||
| 348 | mycallbak = partial(self._create_subscription_callback, callback, resp_fut) |
||
| 349 | self._uasocket.send_request(request, mycallbak) |
||
| 350 | return resp_fut.result(self._timeout) |
||
| 351 | |||
| 352 | def _create_subscription_callback(self, pub_callback, resp_fut, data_fut): |
||
| 353 | self.logger.info("_create_subscription_callback") |
||
| 354 | data = data_fut.result() |
||
| 355 | response = ua.CreateSubscriptionResponse.from_binary(data) |
||
| 356 | response.ResponseHeader.ServiceResult.check() |
||
| 357 | self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback |
||
| 358 | resp_fut.set_result(response.Parameters) |
||
| 359 | |||
| 360 | def delete_subscriptions(self, subscriptionids): |
||
| 361 | self.logger.info("delete_subscription") |
||
| 362 | request = ua.DeleteSubscriptionsRequest() |
||
| 363 | request.Parameters.SubscriptionIds = subscriptionids |
||
| 364 | resp_fut = Future() |
||
| 365 | mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut) |
||
| 366 | self._uasocket.send_request(request, mycallbak) |
||
| 367 | return resp_fut.result(self._timeout) |
||
| 368 | |||
| 369 | def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut): |
||
| 370 | self.logger.info("_delete_subscriptions_callback") |
||
| 371 | data = data_fut.result() |
||
| 372 | response = ua.DeleteSubscriptionsResponse.from_binary(data) |
||
| 373 | response.ResponseHeader.ServiceResult.check() |
||
| 374 | for sid in subscriptionids: |
||
| 375 | self._publishcallbacks.pop(sid) |
||
| 376 | resp_fut.set_result(response.Results) |
||
| 377 | |||
| 378 | def publish(self, acks=None): |
||
| 379 | self.logger.info("publish") |
||
| 380 | if acks is None: |
||
| 381 | acks = [] |
||
| 382 | request = ua.PublishRequest() |
||
| 383 | request.Parameters.SubscriptionAcknowledgements = acks |
||
| 384 | self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # timeout could be set to 0 but some servers to not support it |
||
| 385 | |||
| 386 | def _call_publish_callback(self, future): |
||
| 387 | self.logger.info("call_publish_callback") |
||
| 388 | data = future.result() |
||
| 389 | self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response") |
||
| 390 | try: |
||
| 391 | response = ua.PublishResponse.from_binary(data) |
||
| 392 | except Exception: |
||
| 393 | self.logger.exception("Error parsing notificatipn from server") |
||
| 394 | self.publish([]) #send publish request ot server so he does stop sending notifications |
||
| 395 | return |
||
| 396 | if response.Parameters.SubscriptionId not in self._publishcallbacks: |
||
| 397 | self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId) |
||
| 398 | return |
||
| 399 | callback = self._publishcallbacks[response.Parameters.SubscriptionId] |
||
| 400 | try: |
||
| 401 | callback(response.Parameters) |
||
| 402 | except Exception: # we call client code, catch everything! |
||
| 403 | self.logger.exception("Exception while calling user callback: %s") |
||
| 404 | |||
| 405 | def create_monitored_items(self, params): |
||
| 406 | self.logger.info("create_monitored_items") |
||
| 407 | request = ua.CreateMonitoredItemsRequest() |
||
| 408 | request.Parameters = params |
||
| 409 | data = self._uasocket.send_request(request) |
||
| 410 | response = ua.CreateMonitoredItemsResponse.from_binary(data) |
||
| 411 | response.ResponseHeader.ServiceResult.check() |
||
| 412 | return response.Results |
||
| 413 | |||
| 414 | def delete_monitored_items(self, params): |
||
| 415 | self.logger.info("delete_monitored_items") |
||
| 416 | request = ua.DeleteMonitoredItemsRequest() |
||
| 417 | request.Parameters = params |
||
| 418 | data = self._uasocket.send_request(request) |
||
| 419 | response = ua.DeleteMonitoredItemsResponse.from_binary(data) |
||
| 420 | response.ResponseHeader.ServiceResult.check() |
||
| 421 | return response.Results |
||
| 422 | |||
| 423 | def add_nodes(self, nodestoadd): |
||
| 424 | self.logger.info("add_nodes") |
||
| 425 | request = ua.AddNodesRequest() |
||
| 426 | request.Parameters.NodesToAdd = nodestoadd |
||
| 427 | data = self._uasocket.send_request(request) |
||
| 428 | response = ua.AddNodesResponse.from_binary(data) |
||
| 429 | response.ResponseHeader.ServiceResult.check() |
||
| 430 | return response.Results |
||
| 431 | |||
| 432 | def delete_nodes(self, nodestodelete): |
||
| 433 | self.logger.info("delete_nodes") |
||
| 434 | request = ua.DeleteNodesRequest() |
||
| 435 | request.Parameters.NodesToDelete = nodestodelete |
||
| 436 | data = self._uasocket.send_request(request) |
||
| 437 | response = ua.DeleteNodesResponse.from_binary(data) |
||
| 438 | response.ResponseHeader.ServiceResult.check() |
||
| 439 | return response.Results |
||
| 440 | |||
| 441 | def call(self, methodstocall): |
||
| 442 | request = ua.CallRequest() |
||
| 443 | request.Parameters.MethodsToCall = methodstocall |
||
| 444 | data = self._uasocket.send_request(request) |
||
| 445 | response = ua.CallResponse.from_binary(data) |
||
| 446 | response.ResponseHeader.ServiceResult.check() |
||
| 447 | return response.Results |
||
| 448 | |||
| 449 | def history_read(self, params): |
||
| 450 | self.logger.info("history_read") |
||
| 451 | request = ua.HistoryReadRequest() |
||
| 452 | request.Parameters = params |
||
| 453 | data = self._uasocket.send_request(request) |
||
| 454 | response = ua.HistoryReadResponse.from_binary(data) |
||
| 455 | response.ResponseHeader.ServiceResult.check() |
||
| 456 | return response.Results |
||
| 457 |