Passed
Pull Request — master (#206)
by Grega
01:29
created

NiaPy.task.Task.Task.__init__()   C

Complexity

Conditions 10

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 41
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.Task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from enum import Enum
2
3
from matplotlib import pyplot as plt, animation as anim
4
from numpy import inf, random as rand, asarray
5
from numpy.core.multiarray import ndarray, dot
6
from numpy.core.umath import fabs
7
8
from NiaPy.util import Utility, limit_repair, fullArray, FesException, GenException, RefException
9
10
11
__all__ = [
12
    "OptimizationType",
13
    "Task",
14
    "CountingTask",
15
    "StoppingTask",
16
    "MoveTask",
17
    "TaskComposition",
18
    "TaskConvPlot",
19
    "TaskConvPrint",
20
    "TaskConvSave",
21
    "ThrowingTask",
22
    "ScaledTask"
23
]
24
25
26
class OptimizationType(Enum):
27
    r"""Enum representing type of optimization.
28
29
    Attributes:
30
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
31
            MAXIMIZATION (int): Represents maximization problems.
32
    """
33
    MINIMIZATION = 1.0
34
    MAXIMIZATION = -1.0
35
36
37
class Task:
38
    r"""Class representing problem to solve with optimization.
39
40
    Date:
41
            2019
42
43
    Author:
44
            Klemen Berkovič
45
46
    Attributes:
47
            D (int): Dimension of the problem.
48
            Lower (numpy.ndarray): Lower bounds of the problem.
49
            Upper (numpy.ndarray): Upper bounds of the problem.
50
            bRange (numpy.ndarray): Search range between upper and lower limits.
51
            optType (OptimizationType): Optimization type to use.
52
53
    See Also:
54
            * :class:`NiaPy.util.Utility`
55
    """
56
    D = 0
57
    benchmark = None
58
    Lower, Upper, bRange = inf, inf, inf
59
    optType = OptimizationType.MINIMIZATION
60
61
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
62
        r"""Initialize task class for optimization.
63
64
        Arguments:
65
                D (Optional[int]): Number of dimensions.
66
                optType (Optional[OptimizationType]): Set the type of optimization.
67
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
68
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
69
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
70
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
71
72
        See Also:
73
                * `func`:NiaPy.util.Utility.__init__`
74
                * `func`:NiaPy.util.Utility.repair`
75
        """
76
        # dimension of the problem
77
        self.D = D
78
        # set optimization type
79
        self.optType = optType
80
        # set optimization function
81
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
82
        if self.benchmark is not None:
83
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
84
        # set Lower limits
85
        if Lower is not None:
86
            self.Lower = fullArray(Lower, self.D)
87
        elif Lower is None and benchmark is not None:
88
            self.Lower = fullArray(self.benchmark.Lower, self.D)
89
        else:
90
            self.Lower = fullArray(0, self.D)
91
        # set Upper limits
92
        if Upper is not None:
93
            self.Upper = fullArray(Upper, self.D)
94
        elif Upper is None and benchmark is not None:
95
            self.Upper = fullArray(self.benchmark.Upper, self.D)
96
        else:
97
            self.Upper = fullArray(0, self.D)
98
        # set range
99
        self.bRange = self.Upper - self.Lower
100
        # set repair function
101
        self.frepair = frepair
102
103
    def dim(self):
104
        r"""Get the number of dimensions.
105
106
        Returns:
107
                int: Dimension of problem optimizing.
108
        """
109
        return self.D
110
111
    def bcLower(self):
112
        r"""Get the array of lower bound constraint.
113
114
        Returns:
115
                numpy.ndarray: Lower bound.
116
        """
117
        return self.Lower
118
119
    def bcUpper(self):
120
        r"""Get the array of upper bound constraint.
121
122
        Returns:
123
                numpy.ndarray: Upper bound.
124
        """
125
        return self.Upper
126
127
    def bcRange(self):
128
        r"""Get the range of bound constraint.
129
130
        Returns:
131
                numpy.ndarray: Range between lower and upper bound.
132
        """
133
        return self.Upper - self.Lower
134
135
    def repair(self, x, rnd=rand):
136
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
137
138
        Arguments:
139
                x (numpy.ndarray): Solution to check and repair if needed.
140
                rnd (mtrand.RandomState): Random number generator.
141
142
        Returns:
143
                numpy.ndarray: Fixed solution.
144
145
        See Also:
146
                * :func:`NiaPy.util.limitRepair`
147
                * :func:`NiaPy.util.limitInversRepair`
148
                * :func:`NiaPy.util.wangRepair`
149
                * :func:`NiaPy.util.randRepair`
150
                * :func:`NiaPy.util.reflectRepair`
151
        """
152
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
153
154
    def nextIter(self):
155
        r"""Increments the number of algorithm iterations."""
156
157
    def start(self):
158
        r"""Start stopwatch."""
159
160
    def eval(self, A):
161
        r"""Evaluate the solution A.
162
163
        Arguments:
164
                A (numpy.ndarray): Solution to evaluate.
165
166
        Returns:
167
                float: Fitness/function values of solution.
168
        """
169
        return self.Fun(self.D, A) * self.optType.value
170
171
    def isFeasible(self, A):
172
        r"""Check if the solution is feasible.
173
174
        Arguments:
175
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
176
177
        Returns:
178
                bool: `True` if solution is in feasible space else `False`.
179
        """
180
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
181
182
    def stopCond(self):
183
        r"""Check if optimization task should stop.
184
185
        Returns:
186
                bool: `True` if stopping condition is meet else `False`.
187
        """
188
        return False
189
190
191
class CountingTask(Task):
192
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
193
194
    Attributes:
195
            Iters (int): Number of algorithm iterations/generations.
196
            Evals (int): Number of function evaluations.
197
198
    See Also:
199
            * :class:`NiaPy.util.Task`
200
    """
201
202
    def __init__(self, **kwargs):
203
        r"""Initialize counting task.
204
205
        Args:
206
                **kwargs (Dict[str, Any]): Additional arguments.
207
208
        See Also:
209
                * :func:`NiaPy.util.Task.__init__`
210
        """
211
        Task.__init__(self, **kwargs)
212
        self.Iters, self.Evals = 0, 0
213
214
    def eval(self, A):
215
        r"""Evaluate the solution A.
216
217
        This function increments function evaluation counter `self.Evals`.
218
219
        Arguments:
220
                A (numpy.ndarray): Solutions to evaluate.
221
222
        Returns:
223
                float: Fitness/function values of solution.
224
225
        See Also:
226
                * :func:`NiaPy.util.Task.eval`
227
        """
228
        r = Task.eval(self, A)
229
        self.Evals += 1
230
        return r
231
232
    def evals(self):
233
        r"""Get the number of evaluations made.
234
235
        Returns:
236
                int: Number of evaluations made.
237
        """
238
        return self.Evals
239
240
    def iters(self):
241
        r"""Get the number of algorithm iteratins made.
242
243
        Returns:
244
                int: Number of generations/iterations made by algorithm.
245
        """
246
        return self.Iters
247
248
    def nextIter(self):
249
        r"""Increases the number of algorithm iterations made.
250
251
        This function increments number of algorithm iterations/generations counter `self.Iters`.
252
        """
253
        self.Iters += 1
254
255
256
class StoppingTask(CountingTask):
257
    r"""Optimization task with implemented checking for stopping criterias.
258
259
    Attributes:
260
            nGEN (int): Maximum number of algorithm iterations/generations.
261
            nFES (int): Maximum number of function evaluations.
262
            refValue (float): Reference function/fitness values to reach in optimization.
263
            x (numpy.ndarray): Best found individual.
264
            x_f (float): Best found individual function/fitness value.
265
266
    See Also:
267
            * :class:`NiaPy.util.CountingTask`
268
    """
269
270
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, **kwargs):
271
        r"""Initialize task class for optimization.
272
273
        Arguments:
274
                nFES (Optional[int]): Number of function evaluations.
275
                nGEN (Optional[int]): Number of generations or iterations.
276
                refValue (Optional[float]): Reference value of function/fitness function.
277
278
        See Also:
279
                * :func:`NiaPy.util.CountingTask.__init__`
280
        """
281
        CountingTask.__init__(self, **kwargs)
282
        self.refValue = (-inf if refValue is None else refValue)
283
        self.x, self.x_f = None, inf
284
        self.nFES, self.nGEN = nFES, nGEN
285
286
    def eval(self, A):
287
        r"""Evaluate solution.
288
289
        Args:
290
                A (numpy.ndarray): Solution to evaluate.
291
292
        Returns:
293
                float: Fitness/function value of solution.
294
295
        See Also:
296
                * :func:`NiaPy.util.StoppingTask.stopCond`
297
                * :func:`NiaPy.util.CountingTask.eval`
298
        """
299
        if self.stopCond():
300
            return inf * self.optType.value
301
        x_f = CountingTask.eval(self, A)
302
        if x_f < self.x_f:
303
            self.x_f = x_f
304
        return x_f
305
306
    def stopCond(self):
307
        r"""Check if stopping condition reached.
308
309
        Returns:
310
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`
311
        """
312
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
313
314
    def stopCondI(self):
315
        r"""Check if stopping condition reached and increase number of iterations.
316
317
        Returns:
318
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
319
320
        See Also:
321
                * :func:`NiaPy.util.StoppingTask.stopCond`
322
                * :func:`NiaPy.util.CountingTask.nextIter`
323
        """
324
        r = self.stopCond()
325
        CountingTask.nextIter(self)
326
        return r
327
328
329
class ThrowingTask(StoppingTask):
330
    r"""Task that throw exceptions when stopping condition is meet.
331
332
    See Also:
333
            * :class:`NiaPy.util.StoppingTask`
334
    """
335
336
    def __init__(self, **kwargs):
337
        r"""Initialize optimization task.
338
339
        Args:
340
                **kwargs (Dict[str, Any]): Additional arguments.
341
342
        See Also:
343
                * :func:`NiaPy.util.StoppingTask.__init__`
344
        """
345
        StoppingTask.__init__(self, **kwargs)
346
347
    def stopCondE(self):
348
        r"""Throw exception for the given stopping condition.
349
350
        Raises:
351
                * FesException: Thrown when the number of function/fitness evaluations is reached.
352
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
353
                * RefException: Thrown when the reference values is reached.
354
                * TimeException: Thrown when algorithm exceeds time run limit.
355
        """
356
        # dtime = datetime.now() - self.startTime
357
        if self.Evals >= self.nFES:
358
            raise FesException()
359
        if self.Iters >= self.nGEN:
360
            raise GenException()
361
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
362
        if self.refValue >= self.x_f:
363
            raise RefException()
364
365
    def eval(self, A):
366
        r"""Evaluate solution.
367
368
        Args:
369
                A (numpy.ndarray): Solution to evaluate.
370
371
        Returns:
372
                float: Function/fitness values of solution.
373
374
        See Also:
375
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
376
                * :func:`NiaPy.util.StoppingTask.eval`
377
        """
378
        self.stopCondE()
379
        return StoppingTask.eval(self, A)
380
381
382
class MoveTask(StoppingTask):
383
    def __init__(self, o=None, fo=None, M=None, fM=None, optF=None, **kwargs):
384
        r"""Initialize task class for optimization.
385
386
        Arguments:
387
                o (numpy.ndarray[Union[float, int]]): Array for shifting.
388
                of (Callable[numpy.ndarray[Union[float, int]]]): Function applied on shifted input.
389
                M (numpy.ndarray[Union[float, int]]): Matrix for rotating.
390
                fM (Callable[numpy.ndarray[Union[float, int]]]): Function applied after rotating.
391
392
        See Also:
393
                * :func:`NiaPy.util.StoppingTask.__init__`
394
        """
395
        StoppingTask.__init__(self, **kwargs)
396
        self.o = o if isinstance(o, ndarray) or o is None else asarray(o)
397
        self.M = M if isinstance(M, ndarray) or M is None else asarray(M)
398
        self.fo, self.fM, self.optF = fo, fM, optF
399
400
    def eval(self, A):
401
        r"""Evaluate the solution.
402
403
        Args:
404
                A (numpy.ndarray): Solution to evaluate
405
406
        Returns:
407
                float: Fitness/function value of solution.
408
409
        See Also:
410
                * :func:`NiaPy.util.StoppingTask.stopCond`
411
                * :func:`NiaPy.util.StoppingTask.eval`
412
        """
413
        if self.stopCond():
414
            return inf * self.optType.value
415
        X = A - self.o if self.o is not None else A
416
        X = self.fo(X) if self.fo is not None else X
417
        X = dot(X, self.M) if self.M is not None else X
418
        X = self.fM(X) if self.fM is not None else X
419
        r = StoppingTask.eval(self, X) + (self.optF if self.optF is not None else 0)
420
        if r <= self.x_f:
421
            self.x, self.x_f = A, r
422
        return r
423
424
425
class ScaledTask(Task):
426
    r"""Scaled task.
427
428
    Attributes:
429
            _task (Task): Optimization task with evaluation function.
430
            Lower (numpy.ndarray): Scaled lower limit of search space.
431
            Upper (numpy.ndarray): Scaled upper limit of search space.
432
433
    See Also:
434
            * :class:`NiaPy.util.Task`
435
    """
436
437
    def __init__(self, task, Lower, Upper, **kwargs):
438
        r"""Initialize scaled task.
439
440
        Args:
441
                task (Task): Optimization task to scale to new bounds.
442
                Lower (Union[float, int, numpy.ndarray]): New lower bounds.
443
                Upper (Union[float, int, numpy.ndarray]): New upper bounds.
444
                **kwargs (Dict[str, Any]): Additional arguments.
445
446
        See Also:
447
                * :func:`NiaPy.util.fullArray`
448
        """
449
        Task.__init__(self)
450
        self._task = task
451
        self.D = self._task.D
452
        self.Lower, self.Upper = fullArray(Lower, self.D), fullArray(Upper, self.D)
453
        self.bRange = fabs(Upper - Lower)
454
455
    def stopCond(self):
456
        r"""Test for stopping condition.
457
458
        This function uses `self._task` for checking the stopping criteria.
459
460
        Returns:
461
                bool: `True` if stopping condition is meet else `False`.
462
        """
463
        return self._task.stopCond()
464
465
    def stopCondI(self):
466
        r"""Test for stopping condition and increments the number of algorithm generations/iterations.
467
468
        This function uses `self._task` for checking the stopping criteria.
469
470
        Returns:
471
                bool: `True` if stopping condition is meet else `False`.
472
        """
473
        return self._task.stopCondI()
474
475
    def eval(self, A):
476
        r"""Evaluate solution.
477
478
        Args:
479
                A (numpy.ndarray): Solution for calculating function/fitness value.
480
481
        Returns:
482
                float: Function values of solution.
483
        """
484
        return self._task.eval(A)
485
486
    def evals(self):
487
        r"""Get the number of function evaluations.
488
489
        Returns:
490
                int: Number of function evaluations.
491
        """
492
        return self._task.evals()
493
494
    def iters(self):
495
        r"""Get the number of algorithms generations/iterations.
496
497
        Returns:
498
                int: Number of algorithms generations/iterations.
499
        """
500
        return self._task.iters()
501
502
    def nextIter(self):
503
        r"""Increment the number of iterations/generations.
504
505
        Function uses `self._task` to increment number of generations/iterations.
506
        """
507
        self._task.nextIter()
508
509
510
class TaskConvPrint(StoppingTask):
511
    r"""Task class with printing out new global best solutions found.
512
513
    Attributes:
514
            xb (numpy.ndarray): Global best solution.
515
            xb_f (float): Global best function/fitness values.
516
517
    See Also:
518
            * :class:`NiaPy.util.StoppingTask`
519
    """
520
521
    def __init__(self, **kwargs):
522
        r"""Initialize TaskConvPrint class.
523
524
        Args:
525
                **kwargs (Dict[str, Any]): Additional arguments.
526
527
        See Also:
528
                * :func:`NiaPy.util.StoppingTask.__init__`
529
        """
530
        StoppingTask.__init__(self, **kwargs)
531
        self.xb, self.xb_f = None, inf
532
533
    def eval(self, A):
534
        r"""Evaluate solution.
535
536
        Args:
537
                A (nupy.ndarray): Solution to evaluate.
538
539
        Returns:
540
                float: Function/Fitness values of solution.
541
542
        See Also:
543
                * :func:`NiaPy.util.StoppingTask.eval`
544
        """
545
        x_f = StoppingTask.eval(self, A)
546
        if self.x_f != self.xb_f:
547
            self.xb, self.xb_f = A, x_f
548
            logger.info('nFES:%d nGEN:%d => %s -> %s' % (self.Evals, self.Iters, self.xb, self.xb_f * self.optType.value))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable logger does not seem to be defined.
Loading history...
549
        return x_f
550
551
552
class TaskConvSave(StoppingTask):
553
    r"""Task class with logging of function evaluations need to reach some function vale.
554
555
    Attributes:
556
            evals (List[int]): List of ints representing when the new global best was found.
557
            x_f_vals (List[float]): List of floats representing function/fitness values found.
558
559
    See Also:
560
            * :class:`NiaPy.util.StoppingTask`
561
    """
562
563
    def __init__(self, **kwargs):
564
        r"""Initialize TaskConvSave class.
565
566
        Args:
567
                **kwargs (Dict[str, Any]): Additional arguments.
568
569
        See Also:
570
                * :func:`NiaPy.util.StoppingTask.__init__`
571
        """
572
        StoppingTask.__init__(self, **kwargs)
573
        self.evals = []
574
        self.x_f_vals = []
575
576
    def eval(self, A):
577
        r"""Evaluate solution.
578
579
        Args:
580
                A (numpy.ndarray): Individual/solution to evaluate.
581
582
        Returns:
583
                float: Function/fitness values of individual.
584
585
        See Also:
586
                * :func:`SNiaPy.util.toppingTask.eval`
587
        """
588
        x_f = StoppingTask.eval(self, A)
589
        if x_f <= self.x_f:
590
            self.evals.append(self.Evals)
591
            self.x_f_vals.append(x_f)
592
        return x_f
593
594
    def return_conv(self):
595
        r"""Get values of x and y axis for plotting covariance graph.
596
597
        Returns:
598
                Tuple[List[int], List[float]]:
599
                        1. List of ints of function evaluations.
600
                        2. List of ints of function/fitness values.
601
        """
602
        return self.evals, self.x_f_vals
603
604
605
class TaskConvPlot(TaskConvSave):
606
    r"""Task class with ability of showing convergence graph.
607
608
    Attributes:
609
            iters (List[int]): List of ints representing when the new global best was found.
610
            x_fs (List[float]): List of floats representing function/fitness values found.
611
612
    See Also:
613
            * :class:`NiaPy.util.StoppingTask`
614
    """
615
616
    def __init__(self, **kwargs):
617
        r"""TODO.
618
619
        Args:
620
                **kwargs (Dict[str, Any]): Additional arguments.
621
622
        See Also:
623
                * :func:`NiaPy.util.StoppingTask.__init__`
624
        """
625
        StoppingTask.__init__(self, **kwargs)
626
        self.fig = plt.figure()
627
        self.ax = self.fig.subplots(nrows=1, ncols=1)
628
        self.ax.set_xlim(0, self.nFES)
629
        self.line, = self.ax.plot(self.iters, self.x_fs, animated=True)
630
        self.ani = anim.FuncAnimation(self.fig, self.updatePlot, blit=True)
631
        self.showPlot()
632
633
    def eval(self, A):
634
        r"""Evaluate solution.
635
636
        Args:
637
                A (numpy.ndarray): Solution to evaluate.
638
639
        Returns:
640
                float: Fitness/function values of solution.
641
        """
642
        x_f = StoppingTask.eval(self, A)
643
        if not self.x_f_vals:
644
            self.x_f_vals.append(x_f)
645
        elif x_f < self.x_f_vals[-1]:
646
            self.x_f_vals.append(x_f)
647
        else:
648
            self.x_f_vals.append(self.x_f_vals[-1])
649
        self.evals.append(self.Evals)
650
        return x_f
651
652
    def showPlot(self):
653
        r"""Animation updating function."""
654
        plt.show(block=False)
655
        plt.pause(0.001)
656
657
    def updatePlot(self, frame):
658
        r"""Update mathplotlib figure.
659
660
        Args:
661
                frame (): TODO
662
663
        Returns:
664
                Tuple[List[float], Any]:
665
                        1. Line
666
        """
667
        if self.x_f_vals:
668
            max_fs, min_fs = self.x_f_vals[0], self.x_f_vals[-1]
669
            self.ax.set_ylim(min_fs + 1, max_fs + 1)
670
            self.line.set_data(self.evals, self.x_f_vals)
671
        return self.line,
672
673
674
class TaskComposition(MoveTask):
675
    def __init__(self, benchmarks=None, rho=None, lamb=None, bias=None, **kwargs):
676
        r"""Initialize of composite function problem.
677
678
        Arguments:
679
                benchmarks (List[Benchmark]): Optimization function to use in composition
680
                delta (numpy.ndarray[float]): TODO
681
                lamb (numpy.ndarray[float]): TODO
682
                bias (numpy.ndarray[float]): TODO
683
684
        See Also:
685
                * :func:`NiaPy.util.MoveTask.__init__`
686
687
        TODO:
688
                Class is a work in progress.
689
        """
690
        MoveTask.__init__(self, **kwargs)
691
692
    def eval(self, A):
693
        r"""TODO.
694
695
        Args:
696
                A:
697
698
        Returns:
699
                float:
700
701
        Todo:
702
                Usage of multiple functions on the same time
703
        """
704
        return inf
705