| Total Complexity | 40 |
| Total Lines | 325 |
| Duplicated Lines | 0 % |
Complex classes like legion_network often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """! |
||
| 207 | class legion_network(network): |
||
| 208 | """! |
||
| 209 | @brief Local excitatory global inhibitory oscillatory network (LEGION) that uses relaxation oscillator |
||
| 210 | based on Van der Pol model. |
||
| 211 | |||
| 212 | @details The model uses global inhibitor to de-synchronize synchronous ensembles of oscillators. |
||
| 213 | |||
| 214 | Example: |
||
| 215 | @code |
||
| 216 | # Create parameters of the network |
||
| 217 | parameters = legion_parameters(); |
||
| 218 | parameters.Wt = 4.0; |
||
| 219 | |||
| 220 | # Create stimulus |
||
| 221 | stimulus = [1, 1, 0, 0, 0, 1, 1, 1]; |
||
| 222 | |||
| 223 | # Create the network (use CCORE for fast solving) |
||
| 224 | net = legion_network(len(stimulus), parameters, conn_type.GRID_FOUR, ccore = True); |
||
| 225 | |||
| 226 | # Simulate network - result of simulation is output dynamic of the network |
||
| 227 | output_dynamic = net.simulate(1000, 750, stimulus); |
||
| 228 | |||
| 229 | # Draw output dynamic |
||
| 230 | draw_dynamics(output_dynamic.time, output_dynamic.output, x_title = "Time", y_title = "x(t)"); |
||
| 231 | @endcode |
||
| 232 | |||
| 233 | """ |
||
| 234 | |||
| 235 | _name = "Local excitatory global inhibitory oscillatory network (LEGION)" |
||
| 236 | |||
| 237 | _excitatory = None; # excitatory state of each oscillator |
||
| 238 | _inhibitory = None; # inhibitory state of each oscillator |
||
| 239 | _potential = None; # potential of each oscillator |
||
| 240 | |||
| 241 | _stimulus = None; # stimulus of each oscillator |
||
| 242 | _coupling_term = None; # coupling term of each oscillator |
||
| 243 | _global_inhibitor = 0; # value of global inhibitory |
||
| 244 | |||
| 245 | _buffer_coupling_term = None; # coupling terms on previous step of simulation |
||
| 246 | _dynamic_coupling = None; # dynamic connection between oscillators |
||
| 247 | |||
| 248 | _params = None; # parameters of the network |
||
| 249 | |||
| 250 | _noise = None; # noise of each oscillator |
||
| 251 | |||
| 252 | __ccore_legion_pointer = None; |
||
| 253 | |||
| 254 | def __init__(self, num_osc, parameters = None, type_conn = conn_type.ALL_TO_ALL, type_conn_represent = conn_represent.MATRIX, ccore = False): |
||
| 255 | """! |
||
| 256 | @brief Constructor of oscillatory network LEGION (local excitatory global inhibitory oscillatory network). |
||
| 257 | |||
| 258 | @param[in] num_osc (uint): Number of oscillators in the network. |
||
| 259 | @param[in] parameters (legion_parameters): Parameters of the network that are defined by structure 'legion_parameters'. |
||
| 260 | @param[in] type_conn (conn_type): Type of connection between oscillators in the network. |
||
| 261 | @param[in] type_conn_represent (conn_represent): Internal representation of connection in the network: matrix or list. |
||
| 262 | @param[in] ccore (bool): If True then all interaction with object will be performed via CCORE library (C++ implementation of pyclustering). |
||
| 263 | |||
| 264 | """ |
||
| 265 | |||
| 266 | # set parameters of the network |
||
| 267 | if (parameters is not None): |
||
| 268 | self._params = parameters; |
||
| 269 | else: |
||
| 270 | self._params = legion_parameters(); |
||
| 271 | |||
| 272 | if (ccore is True): |
||
| 273 | self.__ccore_legion_pointer = wrapper.legion_create(num_osc, type_conn, self._params); |
||
| 274 | else: |
||
| 275 | super().__init__(num_osc, type_conn, type_conn_represent); |
||
| 276 | |||
| 277 | # initial states |
||
| 278 | self._excitatory = [ random.random() for i in range(self._num_osc) ]; |
||
| 279 | self._inhibitory = [0.0] * self._num_osc; |
||
| 280 | self._potential = [0.0] * self._num_osc; |
||
| 281 | |||
| 282 | self._coupling_term = [0.0] * self._num_osc; |
||
| 283 | self._buffer_coupling_term = [0.0] * self._num_osc; |
||
| 284 | |||
| 285 | # generate first noises |
||
| 286 | self._noise = [random.random() * self._params.ro for i in range(self._num_osc)]; |
||
| 287 | |||
| 288 | |||
| 289 | def __create_stimulus(self, stimulus): |
||
| 290 | """! |
||
| 291 | @brief Create stimulus for oscillators in line with stimulus map and parameters. |
||
| 292 | |||
| 293 | @param[in] stimulus (list): Stimulus for oscillators that is represented by list, number of stimulus should be equal number of oscillators. |
||
| 294 | |||
| 295 | """ |
||
| 296 | |||
| 297 | if (len(stimulus) != self._num_osc): |
||
| 298 | raise NameError("Number of stimulus should be equal number of oscillators in the network.");
|
||
| 299 | else: |
||
| 300 | self._stimulus = []; |
||
| 301 | |||
| 302 | for val in stimulus: |
||
| 303 | if (val > 0): self._stimulus.append(self._params.I); |
||
| 304 | else: self._stimulus.append(0); |
||
| 305 | |||
| 306 | |||
| 307 | def __create_dynamic_connections(self): |
||
| 308 | """! |
||
| 309 | @brief Create dynamic connection in line with input stimulus. |
||
| 310 | |||
| 311 | """ |
||
| 312 | |||
| 313 | if (self._stimulus is None): |
||
| 314 | raise NameError("Stimulus should initialed before creation of the dynamic connections in the network.");
|
||
| 315 | |||
| 316 | self._dynamic_coupling = [ [0] * self._num_osc for i in range(self._num_osc)]; |
||
| 317 | |||
| 318 | for i in range(self._num_osc): |
||
| 319 | neighbors = self.get_neighbors(i); |
||
| 320 | |||
| 321 | if ( (len(neighbors) > 0) and (self._stimulus[i] > 0) ): |
||
| 322 | number_stimulated_neighbors = 0.0; |
||
| 323 | for j in neighbors: |
||
| 324 | if (self._stimulus[j] > 0): |
||
| 325 | number_stimulated_neighbors += 1.0; |
||
| 326 | |||
| 327 | if (number_stimulated_neighbors > 0): |
||
| 328 | dynamic_weight = self._params.Wt / number_stimulated_neighbors; |
||
| 329 | |||
| 330 | for j in neighbors: |
||
| 331 | self._dynamic_coupling[i][j] = dynamic_weight; |
||
| 332 | |||
| 333 | |||
| 334 | def simulate(self, steps, time, stimulus, solution = solve_type.RK4, collect_dynamic = True): |
||
| 335 | """! |
||
| 336 | @brief Performs static simulation of LEGION oscillatory network. |
||
| 337 | |||
| 338 | @param[in] steps (uint): Number steps of simulations during simulation. |
||
| 339 | @param[in] time (double): Time of simulation. |
||
| 340 | @param[in] stimulus (list): Stimulus for oscillators, number of stimulus should be equal to number of oscillators, |
||
| 341 | example of stimulus for 5 oscillators [0, 0, 1, 1, 0], value of stimulus is defined by parameter 'I'. |
||
| 342 | @param[in] solution (solve_type): Type of solution (solving). |
||
| 343 | @param[in] collect_dynamic (bool): If True - returns whole dynamic of oscillatory network, otherwise returns only last values of dynamics. |
||
| 344 | |||
| 345 | @return (list) Dynamic of oscillatory network. If argument 'collect_dynamic' = True, than return dynamic for the whole simulation time, |
||
| 346 | otherwise returns only last values (last step of simulation) of dynamic. |
||
| 347 | |||
| 348 | """ |
||
| 349 | |||
| 350 | if (self.__ccore_legion_pointer is not None): |
||
| 351 | pointer_dynamic = wrapper.legion_simulate(self.__ccore_legion_pointer, steps, time, solution, collect_dynamic, stimulus); |
||
| 352 | return legion_dynamic(None, None, None, pointer_dynamic); |
||
| 353 | |||
| 354 | # Check solver before simulation |
||
| 355 | if (solution == solve_type.FAST): |
||
| 356 | raise NameError("Solver FAST is not support due to low accuracy that leads to huge error.");
|
||
| 357 | |||
| 358 | elif (solution == solve_type.RKF45): |
||
| 359 | raise NameError("Solver RKF45 is not support in python version. RKF45 is supported in CCORE implementation.");
|
||
| 360 | |||
| 361 | # set stimulus |
||
| 362 | self.__create_stimulus(stimulus); |
||
| 363 | |||
| 364 | # calculate dynamic weights |
||
| 365 | self.__create_dynamic_connections(); |
||
| 366 | |||
| 367 | dyn_exc = None; |
||
| 368 | dyn_time = None; |
||
| 369 | dyn_ginh = None; |
||
| 370 | |||
| 371 | # Store only excitatory of the oscillator |
||
| 372 | if (collect_dynamic == True): |
||
| 373 | dyn_exc = []; |
||
| 374 | dyn_time = []; |
||
| 375 | dyn_ginh = []; |
||
| 376 | |||
| 377 | step = time / steps; |
||
| 378 | int_step = step / 10.0; |
||
| 379 | |||
| 380 | for t in numpy.arange(step, time + step, step): |
||
| 381 | # update states of oscillators |
||
| 382 | self._calculate_states(solution, t, step, int_step); |
||
| 383 | |||
| 384 | # update states of oscillators |
||
| 385 | if (collect_dynamic == True): |
||
| 386 | dyn_exc.append(self._excitatory); |
||
| 387 | dyn_time.append(t); |
||
| 388 | dyn_ginh.append(self._global_inhibitor); |
||
| 389 | else: |
||
| 390 | dyn_exc = self._excitatory; |
||
| 391 | dyn_time = t; |
||
| 392 | dyn_ginh = self._global_inhibitor; |
||
| 393 | |||
| 394 | return legion_dynamic(dyn_exc, dyn_ginh, dyn_time); |
||
| 395 | |||
| 396 | |||
| 397 | def _calculate_states(self, solution, t, step, int_step): |
||
| 398 | """! |
||
| 399 | @brief Calculates new state of each oscillator in the network. |
||
| 400 | |||
| 401 | @param[in] solution (solve_type): Type solver of the differential equation. |
||
| 402 | @param[in] t (double): Current time of simulation. |
||
| 403 | @param[in] step (double): Step of solution at the end of which states of oscillators should be calculated. |
||
| 404 | @param[in] int_step (double): Step differentiation that is used for solving differential equation. |
||
| 405 | |||
| 406 | """ |
||
| 407 | |||
| 408 | next_excitatory = [0.0] * self._num_osc; |
||
| 409 | next_inhibitory = [0.0] * self._num_osc; |
||
| 410 | |||
| 411 | next_potential = []; |
||
| 412 | if (self._params.ENABLE_POTENTIONAL is True): |
||
| 413 | next_potential = [0.0] * self._num_osc; |
||
| 414 | |||
| 415 | # Update states of oscillators |
||
| 416 | for index in range (0, self._num_osc, 1): |
||
| 417 | if (self._params.ENABLE_POTENTIONAL is True): |
||
| 418 | result = odeint(self._legion_state, [self._excitatory[index], self._inhibitory[index], self._potential[index]], numpy.arange(t - step, t, int_step), (index , )); |
||
| 419 | [ next_excitatory[index], next_inhibitory[index], next_potential[index] ] = result[len(result) - 1][0:3]; |
||
| 420 | |||
| 421 | else: |
||
| 422 | result = odeint(self._legion_state_simplify, [self._excitatory[index], self._inhibitory[index] ], numpy.arange(t - step, t, int_step), (index , )); |
||
| 423 | [ next_excitatory[index], next_inhibitory[index] ] = result[len(result) - 1][0:2]; |
||
| 424 | |||
| 425 | # Update coupling term |
||
| 426 | neighbors = self.get_neighbors(index); |
||
| 427 | |||
| 428 | coupling = 0 |
||
| 429 | for index_neighbor in neighbors: |
||
| 430 | coupling += self._dynamic_coupling[index][index_neighbor] * heaviside(self._excitatory[index_neighbor] - self._params.teta_x); |
||
| 431 | |||
| 432 | self._buffer_coupling_term[index] = coupling - self._params.Wz * heaviside(self._global_inhibitor - self._params.teta_xz); |
||
| 433 | |||
| 434 | # Update state of global inhibitory |
||
| 435 | result = odeint(self._global_inhibitor_state, self._global_inhibitor, numpy.arange(t - step, t, int_step), (None, )); |
||
| 436 | self._global_inhibitor = result[len(result) - 1][0]; |
||
| 437 | |||
| 438 | self._noise = [random.random() * self._params.ro for i in range(self._num_osc)]; |
||
| 439 | self._coupling_term = self._buffer_coupling_term[:]; |
||
| 440 | self._inhibitory = next_inhibitory[:]; |
||
| 441 | self._excitatory = next_excitatory[:]; |
||
| 442 | |||
| 443 | if (self._params.ENABLE_POTENTIONAL is True): |
||
| 444 | self._potential = next_potential[:]; |
||
| 445 | |||
| 446 | |||
| 447 | |||
| 448 | def _global_inhibitor_state(self, z, t, argv): |
||
| 449 | """! |
||
| 450 | @brief Returns new value of global inhibitory |
||
| 451 | |||
| 452 | @param[in] z (dobule): Current value of inhibitory. |
||
| 453 | @param[in] t (double): Current time of simulation. |
||
| 454 | @param[in] argv (tuple): It's not used, can be ignored. |
||
| 455 | |||
| 456 | @return (double) New value if global inhibitory (not assign). |
||
| 457 | |||
| 458 | """ |
||
| 459 | |||
| 460 | sigma = 0.0; |
||
| 461 | |||
| 462 | for x in self._excitatory: |
||
| 463 | if (x > self._params.teta_zx): |
||
| 464 | sigma = 1.0; |
||
| 465 | break; |
||
| 466 | |||
| 467 | return self._params.fi * (sigma - z); |
||
| 468 | |||
| 469 | |||
| 470 | def _legion_state_simplify(self, inputs, t, argv): |
||
| 471 | """! |
||
| 472 | @brief Returns new values of excitatory and inhibitory parts of oscillator of oscillator. |
||
| 473 | @details Simplify model doesn't consider oscillator potential. |
||
| 474 | |||
| 475 | @param[in] inputs (list): Initial values (current) of oscillator [excitatory, inhibitory]. |
||
| 476 | @param[in] t (double): Current time of simulation. |
||
| 477 | @param[in] argv (uint): Extra arguments that are not used for integration - index of oscillator. |
||
| 478 | |||
| 479 | @return (list) New values of excitatoty and inhibitory part of oscillator (not assign). |
||
| 480 | |||
| 481 | """ |
||
| 482 | |||
| 483 | index = argv; |
||
| 484 | |||
| 485 | x = inputs[0]; # excitatory |
||
| 486 | y = inputs[1]; # inhibitory |
||
| 487 | |||
| 488 | dx = 3.0 * x - x ** 3.0 + 2.0 - y + self._stimulus[index] + self._coupling_term[index] + self._noise[index]; |
||
| 489 | dy = self._params.eps * (self._params.gamma * (1.0 + math.tanh(x / self._params.betta)) - y); |
||
| 490 | |||
| 491 | neighbors = self.get_neighbors(index); |
||
| 492 | potential = 0.0; |
||
| 493 | |||
| 494 | for index_neighbor in neighbors: |
||
| 495 | potential += self._params.T * heaviside(self._excitatory[index_neighbor] - self._params.teta_x); |
||
| 496 | |||
| 497 | return [dx, dy]; |
||
| 498 | |||
| 499 | |||
| 500 | def _legion_state(self, inputs, t, argv): |
||
| 501 | """! |
||
| 502 | @brief Returns new values of excitatory and inhibitory parts of oscillator and potential of oscillator. |
||
| 503 | |||
| 504 | @param[in] inputs (list): Initial values (current) of oscillator [excitatory, inhibitory, potential]. |
||
| 505 | @param[in] t (double): Current time of simulation. |
||
| 506 | @param[in] argv (uint): Extra arguments that are not used for integration - index of oscillator. |
||
| 507 | |||
| 508 | @return (list) New values of excitatoty and inhibitory part of oscillator and new value of potential (not assign). |
||
| 509 | |||
| 510 | """ |
||
| 511 | |||
| 512 | index = argv; |
||
| 513 | |||
| 514 | x = inputs[0]; # excitatory |
||
| 515 | y = inputs[1]; # inhibitory |
||
| 516 | p = inputs[2]; # potential |
||
| 517 | |||
| 518 | potential_influence = heaviside(p + math.exp(-self._params.alpha * t) - self._params.teta); |
||
| 519 | |||
| 520 | dx = 3.0 * x - x ** 3.0 + 2.0 - y + self._stimulus[index] * potential_influence + self._coupling_term[index] + self._noise[index]; |
||
| 521 | dy = self._params.eps * (self._params.gamma * (1.0 + math.tanh(x / self._params.betta)) - y); |
||
| 522 | |||
| 523 | neighbors = self.get_neighbors(index); |
||
| 524 | potential = 0.0; |
||
| 525 | |||
| 526 | for index_neighbor in neighbors: |
||
| 527 | potential += self._params.T * heaviside(self._excitatory[index_neighbor] - self._params.teta_x); |
||
| 528 | |||
| 529 | dp = self._params.lamda * (1.0 - p) * heaviside(potential - self._params.teta_p) - self._params.mu * p; |
||
| 530 | |||
| 531 | return [dx, dy, dp]; |
||
| 532 |
This can be caused by one of the following:
1. Missing Dependencies
This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.
2. Missing __init__.py files
This error could also result from missing
__init__.pyfiles in your module folders. Make sure that you place one file in each sub-folder.