| Total Complexity | 40 |
| Total Lines | 293 |
| Duplicated Lines | 10.24 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like CameraThorlabs often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 38 | class CameraThorlabs(Base, CameraInterface): |
||
| 39 | """ |
||
| 40 | Main class of the module |
||
| 41 | """ |
||
| 42 | |||
| 43 | _modtype = 'camera' |
||
| 44 | _modclass = 'hardware' |
||
| 45 | |||
| 46 | _default_exposure = ConfigOption('default_exposure', 0.1)
|
||
| 47 | _default_gain = ConfigOption('_default_gain', 1.0)
|
||
| 48 | _id_camera = ConfigOption('id_camera', 0) # if more than one camera is present
|
||
| 49 | |||
| 50 | _dll = None |
||
| 51 | _camera_handle = None |
||
| 52 | _exposure = _default_exposure |
||
| 53 | _gain = _default_gain |
||
| 54 | _width = 0 |
||
| 55 | _height = 0 |
||
| 56 | _pos_x = 0 |
||
| 57 | _pos_y = 0 |
||
| 58 | _bit_depth = 0 |
||
| 59 | _cam = None |
||
| 60 | _acquiring = False |
||
| 61 | _live = False |
||
| 62 | _last_acquisition_mode = None # useful if config changes during acq |
||
| 63 | _sensor_info = None |
||
| 64 | |||
| 65 | _image_memory = None |
||
| 66 | _image_pid = None |
||
| 67 | |||
| 68 | def on_activate(self): |
||
| 69 | """ Initialisation performed during activation of the module. |
||
| 70 | """ |
||
| 71 | |||
| 72 | # Load the dll if present |
||
| 73 | self._load_dll() |
||
| 74 | self._connect_camera() |
||
| 75 | self._init_camera() |
||
| 76 | |||
| 77 | def _check_error(self, code, message): |
||
| 78 | """ |
||
| 79 | Check that the code means OK and log message as error if not. Return True if OK, False otherwise. |
||
| 80 | |||
| 81 | """ |
||
| 82 | if code != IS_SUCCESS: |
||
| 83 | self.log.error(message) |
||
| 84 | return False |
||
| 85 | else: |
||
| 86 | return True |
||
| 87 | |||
| 88 | def _check_int_range(self, value, mini, maxi ,message): |
||
| 89 | """ |
||
| 90 | Check that value is in the range [mini, maxi] and log message as error if not. Return True if OK. |
||
| 91 | |||
| 92 | """ |
||
| 93 | if value < mini or value > maxi: |
||
| 94 | self.log.error('{} - Value {} must be between {} and {}'.format(message, value, mini, maxi))
|
||
| 95 | return False |
||
| 96 | else: |
||
| 97 | return True |
||
| 98 | |||
| 99 | def _load_dll(self): |
||
| 100 | """ |
||
| 101 | Load the dll for the camera |
||
| 102 | """ |
||
| 103 | try: |
||
| 104 | if platform.system() == "Windows": |
||
| 105 | if platform.architecture()[0] == "64bit": |
||
| 106 | self._dll = ctypes.cdll.uc480_64 |
||
| 107 | else: |
||
| 108 | self._dll = ctypes.cdll.uc480 |
||
| 109 | # for Linux |
||
| 110 | elif platform.system() == "Linux": |
||
| 111 | self._dll = ctypes.cdll.LoadLibrary('libueye_api.so')
|
||
| 112 | else: |
||
| 113 | self.log.error("Can not detect operating system to load Thorlabs DLL.")
|
||
| 114 | except OSError: |
||
| 115 | self.log.error("Can not log Thorlabs DLL.")
|
||
| 116 | |||
| 117 | def _connect_camera(self): |
||
| 118 | """ |
||
| 119 | Connect to the camera and get basic info on it |
||
| 120 | """ |
||
| 121 | number_of_cameras = ctypes.c_int(0) |
||
| 122 | self._dll.is_GetNumberOfCameras(byref(number_of_cameras)) |
||
| 123 | if number_of_cameras.value < 1: |
||
| 124 | self.log.error("No Thorlabs camera detected.")
|
||
| 125 | elif number_of_cameras.value - 1 < self._id_camera: |
||
| 126 | self.log.error("A Thorlabs camera has been detected but the id specified above the number of camera(s)")
|
||
| 127 | else: |
||
| 128 | self._camera_handle = ctypes.c_int(0) |
||
| 129 | ret = self._dll.is_InitCamera(ctypes.pointer(self._camera_handle)) |
||
| 130 | self._check_error(ret, "Could not initialize camera") |
||
| 131 | self._sensor_info = SENSORINFO() |
||
| 132 | self._dll.is_GetSensorInfo(self._camera_handle, byref(self._sensor_info)) |
||
| 133 | self.log.debug('Connected to camera : {}'.format(str(self._sensor_info.strSensorName)))
|
||
| 134 | |||
| 135 | def _init_camera(self): |
||
| 136 | """ |
||
| 137 | Set the parameters of the camera for our usage |
||
| 138 | """ |
||
| 139 | # Color mode |
||
| 140 | code = self._dll.is_SetColorMode(self._camera_handle, ctypes.c_int(IS_SET_CM_Y8)) |
||
| 141 | self._check_error(code, "Could set color mode IS_SET_CM_Y8") |
||
| 142 | self._bit_depth = 8 |
||
| 143 | # Image size |
||
| 144 | self.set_image_size(self._sensor_info.nMaxWidth, self._sensor_info.nMaxHeight) |
||
| 145 | # Image position |
||
| 146 | self.set_image_position(0, 0) |
||
| 147 | # Binning |
||
| 148 | code = self._dll.is_SetBinning(self._camera_handle, ctypes.c_int(0)) # Disable binning |
||
| 149 | self._check_error(code, "Could set binning disabled") |
||
| 150 | # Sub sampling |
||
| 151 | code = self._dll.is_SetSubSampling(self._camera_handle, ctypes.c_int(0)) # Disable sub sampling |
||
| 152 | self._check_error(code, "Could set sub sampling disabled") |
||
| 153 | # Allocate image memory |
||
| 154 | self._image_pid = ctypes.c_int() |
||
| 155 | self._image_memory = ctypes.c_char_p() |
||
| 156 | code = self._dll.is_AllocImageMem( |
||
| 157 | self._camera_handle, self._width, self._height, |
||
| 158 | self._bit_depth, byref(self._image_memory), byref(self._image_pid)) |
||
| 159 | self._check_error(code, "Could not allocate image memory") |
||
| 160 | # Set image memory |
||
| 161 | code = self._dll.is_SetImageMem(self._camera_handle, self._image_memory, self._image_pid) |
||
| 162 | self._check_error(code, "Could not set image memory") |
||
| 163 | # Set auto exit |
||
| 164 | code = self._dll.is_EnableAutoExit(self._camera_handle, 1) # Enable auto-exit |
||
| 165 | self._check_error(code, "Could not set auto exit") |
||
| 166 | |||
| 167 | self.set_exposure(self._exposure) |
||
| 168 | self.set_gain(self._gain) |
||
| 169 | |||
| 170 | View Code Duplication | def set_image_size(self, width=None, height=None): |
|
|
|
|||
| 171 | """ |
||
| 172 | Set the size of the image, here the camera will acquire only part of the image from a given position |
||
| 173 | """ |
||
| 174 | if width is not None: |
||
| 175 | width = int(width) |
||
| 176 | self._check_int_range(width, 1, self._sensor_info.nMaxWidth, 'Can not set image width') |
||
| 177 | self._width = width |
||
| 178 | if height is not None: |
||
| 179 | height = int(height) |
||
| 180 | self._check_int_range(height, 1, self._sensor_info.nMaxHeight, 'Can not set image height') |
||
| 181 | self._height = height |
||
| 182 | |||
| 183 | code = self._dll.is_SetImageSize(self._camera_handle, ctypes.c_int(self._width), ctypes.c_int(self._height)) |
||
| 184 | return self._check_error(code, "Could not set image size") |
||
| 185 | |||
| 186 | View Code Duplication | def set_image_position(self, pos_x, pos_y): |
|
| 187 | """ |
||
| 188 | Set image position reference coordinate |
||
| 189 | """ |
||
| 190 | if pos_x is not None: |
||
| 191 | pos_x = int(pos_x) |
||
| 192 | self._check_int_range(pos_x, 0, self._sensor_info.nMaxWidth-1, 'Can not set image position x') |
||
| 193 | self._pos_x = pos_x |
||
| 194 | if pos_y is not None: |
||
| 195 | pos_y = int(pos_y) |
||
| 196 | self._check_int_range(pos_y, 0, self._sensor_info.nMaxHeight-1, 'Can not set image position y') |
||
| 197 | self._pos_y = pos_y |
||
| 198 | |||
| 199 | code = self._dll.is_SetImagePos(self._camera_handle, ctypes.c_int(self._pos_x), ctypes.c_int(self._pos_y)) |
||
| 200 | return self._check_error(code, "Could not set image position") |
||
| 201 | |||
| 202 | |||
| 203 | def on_deactivate(self): |
||
| 204 | """ |
||
| 205 | Deinitialisation performed during deactivation of the module. |
||
| 206 | """ |
||
| 207 | self._dll.is_ExitCamera(self._camera_handle) |
||
| 208 | self._acquiring = False |
||
| 209 | self._live = False |
||
| 210 | |||
| 211 | def get_name(self): |
||
| 212 | """ |
||
| 213 | Return a name for the camera |
||
| 214 | """ |
||
| 215 | return self._sensor_info.strSensorName |
||
| 216 | |||
| 217 | def get_size(self): |
||
| 218 | """ |
||
| 219 | Return the max size of the camera |
||
| 220 | """ |
||
| 221 | return self._width, self._height |
||
| 222 | |||
| 223 | def support_live_acquisition(self): |
||
| 224 | """ |
||
| 225 | Return whether or not this camera support live acquisition |
||
| 226 | """ |
||
| 227 | return True |
||
| 228 | |||
| 229 | def start_live_acquisition(self): |
||
| 230 | """ |
||
| 231 | Set the camera in live mode |
||
| 232 | """ |
||
| 233 | if self.get_ready_state(): |
||
| 234 | self._acquiring = True |
||
| 235 | self._live = True |
||
| 236 | code = self._dll.is_CaptureVideo(self._camera_handle, c_int(IS_DONT_WAIT)) |
||
| 237 | no_error = self._check_error(code, "Could not start live acquisition") |
||
| 238 | if not no_error: |
||
| 239 | self._acquiring = False |
||
| 240 | self._live = False |
||
| 241 | return False |
||
| 242 | return True |
||
| 243 | else: |
||
| 244 | return False |
||
| 245 | |||
| 246 | def start_single_acquisition(self): |
||
| 247 | """ |
||
| 248 | Start the acquisition of a single image |
||
| 249 | """ |
||
| 250 | if self.get_ready_state(): |
||
| 251 | self._acquiring = True |
||
| 252 | code = self._dll.is_FreezeVideo(self._camera_handle, c_int(IS_WAIT)) |
||
| 253 | self._acquiring = False |
||
| 254 | return self._check_error(code, "Could not start single acquisition") |
||
| 255 | else: |
||
| 256 | return False |
||
| 257 | |||
| 258 | def stop_acquisition(self): |
||
| 259 | """ |
||
| 260 | Stop live acquisition |
||
| 261 | """ |
||
| 262 | no_error = True |
||
| 263 | if self._acquiring: |
||
| 264 | code = self._dll.is_StopLiveVideo(self._camera_handle, c_int(IS_FORCE_VIDEO_STOP)) |
||
| 265 | no_error = self._check_error(code, "Could not stop acquisition") |
||
| 266 | self._acquiring = False |
||
| 267 | self._live = False |
||
| 268 | return no_error |
||
| 269 | |||
| 270 | def get_acquired_data(self): |
||
| 271 | """ |
||
| 272 | Return last acquired data from the dll |
||
| 273 | """ |
||
| 274 | # Allocate memory for image: |
||
| 275 | img_size = self._width * self._height |
||
| 276 | c_array = ctypes.c_char * img_size |
||
| 277 | c_img = c_array() |
||
| 278 | # copy camera memory to accessible memory |
||
| 279 | code = self._dll.is_CopyImageMem(self._camera_handle, self._image_memory, self._image_pid, c_img) |
||
| 280 | self._check_error(code, "Could copy image to memory") |
||
| 281 | # Convert to numpy 2d array of float from 0 to 1 |
||
| 282 | img_array = np.frombuffer(c_img, dtype=ctypes.c_ubyte) |
||
| 283 | img_array = img_array.astype(float) |
||
| 284 | img_array.shape = np.array((self._height, self._width)) |
||
| 285 | |||
| 286 | return img_array |
||
| 287 | |||
| 288 | def get_bit_depth(self): |
||
| 289 | """ |
||
| 290 | Return the bit depth of the image |
||
| 291 | """ |
||
| 292 | return self._bit_depth |
||
| 293 | |||
| 294 | def set_exposure(self, time): |
||
| 295 | """ |
||
| 296 | Set the exposure in second |
||
| 297 | Return the new exposure |
||
| 298 | """ |
||
| 299 | exp = c_double(time * 1e3) # in ms |
||
| 300 | new_exp = c_double(0) |
||
| 301 | code = self._dll.is_SetExposureTime(self._camera_handle, exp, byref(new_exp)) |
||
| 302 | self._check_error(code, "Could not set exposure") |
||
| 303 | self._exposure = float(new_exp.value)/1000 # in ms |
||
| 304 | return self._exposure |
||
| 305 | |||
| 306 | def get_exposure(self): |
||
| 307 | """ |
||
| 308 | Return current exposure |
||
| 309 | """ |
||
| 310 | return self._exposure |
||
| 311 | |||
| 312 | def get_ready_state(self): |
||
| 313 | """ |
||
| 314 | Return whether or not the camera is ready for an acquisition |
||
| 315 | """ |
||
| 316 | if self.module_state()!='idle': |
||
| 317 | return False |
||
| 318 | return not self._acquiring |
||
| 319 | |||
| 320 | def set_gain(self, gain): |
||
| 321 | """ |
||
| 322 | Set the gain |
||
| 323 | """ |
||
| 324 | pass |
||
| 325 | |||
| 326 | def get_gain(self): |
||
| 327 | """ |
||
| 328 | Get the gain |
||
| 329 | """ |
||
| 330 | return self._gain |
||
| 331 | |||
| 332 |