Passed
Pull Request — master (#206)
by Grega
01:07
created

NiaPy.task.Task.Task.__init__()   C

Complexity

Conditions 10

Size

Total Lines 47
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 47
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.Task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""The implementation of tasks."""
2
3
import logging
4
from enum import Enum
5
6
from matplotlib import pyplot as plt, animation as anim
7
from numpy import inf, random as rand, asarray
8
from numpy.core.multiarray import ndarray, dot
9
from numpy.core.umath import fabs
10
11
from NiaPy.util import (
12
    limit_repair,
13
    fullArray,
14
    FesException,
15
    GenException,
16
    RefException
17
)
18
from NiaPy.benchmarks.utility import Utility
19
20
__all__ = [
21
    "OptimizationType",
22
    "Task",
23
    "CountingTask",
24
    "StoppingTask",
25
    "MoveTask",
26
    "TaskComposition",
27
    "TaskConvPlot",
28
    "TaskConvPrint",
29
    "TaskConvSave",
30
    "ThrowingTask",
31
    "ScaledTask"
32
]
33
34
logging.basicConfig()
35
logger = logging.getLogger("NiaPy.task.Task")
36
logger.setLevel("INFO")
37
38
39
class OptimizationType(Enum):
40
    r"""Enum representing type of optimization.
41
42
    Attributes:
43
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
44
            MAXIMIZATION (int): Represents maximization problems.
45
46
    """
47
48
    MINIMIZATION = 1.0
49
    MAXIMIZATION = -1.0
50
51
52
class Task:
53
    r"""Class representing problem to solve with optimization.
54
55
    Date:
56
            2019
57
58
    Author:
59
            Klemen Berkovič
60
61
    Attributes:
62
            D (int): Dimension of the problem.
63
            Lower (numpy.ndarray): Lower bounds of the problem.
64
            Upper (numpy.ndarray): Upper bounds of the problem.
65
            bRange (numpy.ndarray): Search range between upper and lower limits.
66
            optType (OptimizationType): Optimization type to use.
67
68
    See Also:
69
            * :class:`NiaPy.util.Utility`
70
71
    """
72
73
    D = 0
74
    benchmark = None
75
    Lower, Upper, bRange = inf, inf, inf
76
    optType = OptimizationType.MINIMIZATION
77
78
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
79
        r"""Initialize task class for optimization.
80
81
        Arguments:
82
                D (Optional[int]): Number of dimensions.
83
                optType (Optional[OptimizationType]): Set the type of optimization.
84
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
85
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
86
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
87
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
88
89
        See Also:
90
                * `func`:NiaPy.util.Utility.__init__`
91
                * `func`:NiaPy.util.Utility.repair`
92
93
        """
94
95
        # dimension of the problem
96
        self.D = D
97
        # set optimization type
98
        self.optType = optType
99
        # set optimization function
100
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
101
102
        if self.benchmark is not None:
103
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
104
105
        # set Lower limits
106
        if Lower is not None:
107
            self.Lower = fullArray(Lower, self.D)
108
        elif Lower is None and benchmark is not None:
109
            self.Lower = fullArray(self.benchmark.Lower, self.D)
110
        else:
111
            self.Lower = fullArray(0, self.D)
112
113
        # set Upper limits
114
        if Upper is not None:
115
            self.Upper = fullArray(Upper, self.D)
116
        elif Upper is None and benchmark is not None:
117
            self.Upper = fullArray(self.benchmark.Upper, self.D)
118
        else:
119
            self.Upper = fullArray(0, self.D)
120
121
        # set range
122
        self.bRange = self.Upper - self.Lower
123
        # set repair function
124
        self.frepair = frepair
125
126
    def dim(self):
127
        r"""Get the number of dimensions.
128
129
        Returns:
130
                int: Dimension of problem optimizing.
131
132
        """
133
134
        return self.D
135
136
    def bcLower(self):
137
        r"""Get the array of lower bound constraint.
138
139
        Returns:
140
                numpy.ndarray: Lower bound.
141
142
        """
143
144
        return self.Lower
145
146
    def bcUpper(self):
147
        r"""Get the array of upper bound constraint.
148
149
        Returns:
150
                numpy.ndarray: Upper bound.
151
152
        """
153
154
        return self.Upper
155
156
    def bcRange(self):
157
        r"""Get the range of bound constraint.
158
159
        Returns:
160
                numpy.ndarray: Range between lower and upper bound.
161
162
        """
163
164
        return self.Upper - self.Lower
165
166
    def repair(self, x, rnd=rand):
167
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
168
169
        Arguments:
170
                x (numpy.ndarray): Solution to check and repair if needed.
171
                rnd (mtrand.RandomState): Random number generator.
172
173
        Returns:
174
                numpy.ndarray: Fixed solution.
175
176
        See Also:
177
                * :func:`NiaPy.util.limitRepair`
178
                * :func:`NiaPy.util.limitInversRepair`
179
                * :func:`NiaPy.util.wangRepair`
180
                * :func:`NiaPy.util.randRepair`
181
                * :func:`NiaPy.util.reflectRepair`
182
183
        """
184
185
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
186
187
    def nextIter(self):
188
        r"""Increments the number of algorithm iterations."""
189
190
    def start(self):
191
        r"""Start stopwatch."""
192
193
    def eval(self, A):
194
        r"""Evaluate the solution A.
195
196
        Arguments:
197
                A (numpy.ndarray): Solution to evaluate.
198
199
        Returns:
200
                float: Fitness/function values of solution.
201
202
        """
203
204
        return self.Fun(self.D, A) * self.optType.value
205
206
    def isFeasible(self, A):
207
        r"""Check if the solution is feasible.
208
209
        Arguments:
210
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
211
212
        Returns:
213
                bool: `True` if solution is in feasible space else `False`.
214
215
        """
216
217
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
218
219
    def stopCond(self):
220
        r"""Check if optimization task should stop.
221
222
        Returns:
223
                bool: `True` if stopping condition is meet else `False`.
224
225
        """
226
227
        return False
228
229
230
class CountingTask(Task):
231
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
232
233
    Attributes:
234
            Iters (int): Number of algorithm iterations/generations.
235
            Evals (int): Number of function evaluations.
236
237
    See Also:
238
            * :class:`NiaPy.util.Task`
239
240
    """
241
242
    def __init__(self, **kwargs):
243
        r"""Initialize counting task.
244
245
        Args:
246
                **kwargs (Dict[str, Any]): Additional arguments.
247
248
        See Also:
249
                * :func:`NiaPy.util.Task.__init__`
250
251
        """
252
253
        Task.__init__(self, **kwargs)
254
        self.Iters, self.Evals = 0, 0
255
256
    def eval(self, A):
257
        r"""Evaluate the solution A.
258
259
        This function increments function evaluation counter `self.Evals`.
260
261
        Arguments:
262
                A (numpy.ndarray): Solutions to evaluate.
263
264
        Returns:
265
                float: Fitness/function values of solution.
266
267
        See Also:
268
                * :func:`NiaPy.util.Task.eval`
269
270
        """
271
272
        r = Task.eval(self, A)
273
        self.Evals += 1
274
        return r
275
276
    def evals(self):
277
        r"""Get the number of evaluations made.
278
279
        Returns:
280
                int: Number of evaluations made.
281
282
        """
283
284
        return self.Evals
285
286
    def iters(self):
287
        r"""Get the number of algorithm iteratins made.
288
289
        Returns:
290
                int: Number of generations/iterations made by algorithm.
291
292
        """
293
294
        return self.Iters
295
296
    def nextIter(self):
297
        r"""Increases the number of algorithm iterations made.
298
299
        This function increments number of algorithm iterations/generations counter `self.Iters`.
300
301
        """
302
303
        self.Iters += 1
304
305
306
class StoppingTask(CountingTask):
307
    r"""Optimization task with implemented checking for stopping criterias.
308
309
    Attributes:
310
            nGEN (int): Maximum number of algorithm iterations/generations.
311
            nFES (int): Maximum number of function evaluations.
312
            refValue (float): Reference function/fitness values to reach in optimization.
313
            x (numpy.ndarray): Best found individual.
314
            x_f (float): Best found individual function/fitness value.
315
316
    See Also:
317
            * :class:`NiaPy.util.CountingTask`
318
319
    """
320
321
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, **kwargs):
322
        r"""Initialize task class for optimization.
323
324
        Arguments:
325
                nFES (Optional[int]): Number of function evaluations.
326
                nGEN (Optional[int]): Number of generations or iterations.
327
                refValue (Optional[float]): Reference value of function/fitness function.
328
329
        See Also:
330
                * :func:`NiaPy.util.CountingTask.__init__`
331
332
        """
333
334
        CountingTask.__init__(self, **kwargs)
335
        self.refValue = (-inf if refValue is None else refValue)
336
        self.x, self.x_f = None, inf
337
        self.nFES, self.nGEN = nFES, nGEN
338
339
    def eval(self, A):
340
        r"""Evaluate solution.
341
342
        Args:
343
                A (numpy.ndarray): Solution to evaluate.
344
345
        Returns:
346
                float: Fitness/function value of solution.
347
348
        See Also:
349
                * :func:`NiaPy.util.StoppingTask.stopCond`
350
                * :func:`NiaPy.util.CountingTask.eval`
351
352
        """
353
354
        if self.stopCond():
355
            return inf * self.optType.value
356
357
        x_f = CountingTask.eval(self, A)
358
359
        if x_f < self.x_f:
360
            self.x_f = x_f
361
362
        return x_f
363
364
    def stopCond(self):
365
        r"""Check if stopping condition reached.
366
367
        Returns:
368
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`
369
370
        """
371
372
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
373
374
    def stopCondI(self):
375
        r"""Check if stopping condition reached and increase number of iterations.
376
377
        Returns:
378
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
379
380
        See Also:
381
                * :func:`NiaPy.util.StoppingTask.stopCond`
382
                * :func:`NiaPy.util.CountingTask.nextIter`
383
384
        """
385
386
        r = self.stopCond()
387
        CountingTask.nextIter(self)
388
        return r
389
390
391
class ThrowingTask(StoppingTask):
392
    r"""Task that throw exceptions when stopping condition is meet.
393
394
    See Also:
395
            * :class:`NiaPy.util.StoppingTask`
396
397
    """
398
399
    def __init__(self, **kwargs):
400
        r"""Initialize optimization task.
401
402
        Args:
403
                **kwargs (Dict[str, Any]): Additional arguments.
404
405
        See Also:
406
                * :func:`NiaPy.util.StoppingTask.__init__`
407
408
        """
409
410
        StoppingTask.__init__(self, **kwargs)
411
412
    def stopCondE(self):
413
        r"""Throw exception for the given stopping condition.
414
415
        Raises:
416
                * FesException: Thrown when the number of function/fitness evaluations is reached.
417
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
418
                * RefException: Thrown when the reference values is reached.
419
                * TimeException: Thrown when algorithm exceeds time run limit.
420
421
        """
422
423
        # dtime = datetime.now() - self.startTime
424
        if self.Evals >= self.nFES:
425
            raise FesException()
426
        if self.Iters >= self.nGEN:
427
            raise GenException()
428
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
429
        if self.refValue >= self.x_f:
430
            raise RefException()
431
432
    def eval(self, A):
433
        r"""Evaluate solution.
434
435
        Args:
436
                A (numpy.ndarray): Solution to evaluate.
437
438
        Returns:
439
                float: Function/fitness values of solution.
440
441
        See Also:
442
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
443
                * :func:`NiaPy.util.StoppingTask.eval`
444
445
        """
446
447
        self.stopCondE()
448
        return StoppingTask.eval(self, A)
449
450
451
class MoveTask(StoppingTask):
452
    """Move task implementation."""
453
454
    def __init__(self, o=None, fo=None, M=None, fM=None, optF=None, **kwargs):
455
        r"""Initialize task class for optimization.
456
457
        Arguments:
458
                o (numpy.ndarray[Union[float, int]]): Array for shifting.
459
                of (Callable[numpy.ndarray[Union[float, int]]]): Function applied on shifted input.
460
                M (numpy.ndarray[Union[float, int]]): Matrix for rotating.
461
                fM (Callable[numpy.ndarray[Union[float, int]]]): Function applied after rotating.
462
463
        See Also:
464
                * :func:`NiaPy.util.StoppingTask.__init__`
465
466
        """
467
468
        StoppingTask.__init__(self, **kwargs)
469
        self.o = o if isinstance(o, ndarray) or o is None else asarray(o)
470
        self.M = M if isinstance(M, ndarray) or M is None else asarray(M)
471
        self.fo, self.fM, self.optF = fo, fM, optF
472
473
    def eval(self, A):
474
        r"""Evaluate the solution.
475
476
        Args:
477
                A (numpy.ndarray): Solution to evaluate
478
479
        Returns:
480
                float: Fitness/function value of solution.
481
482
        See Also:
483
                * :func:`NiaPy.util.StoppingTask.stopCond`
484
                * :func:`NiaPy.util.StoppingTask.eval`
485
486
        """
487
488
        if self.stopCond():
489
            return inf * self.optType.value
490
        X = A - self.o if self.o is not None else A
491
        X = self.fo(X) if self.fo is not None else X
492
        X = dot(X, self.M) if self.M is not None else X
493
        X = self.fM(X) if self.fM is not None else X
494
        r = StoppingTask.eval(self, X) + (self.optF if self.optF is not None else 0)
495
        if r <= self.x_f:
496
            self.x, self.x_f = A, r
497
        return r
498
499
500
class ScaledTask(Task):
501
    r"""Scaled task.
502
503
    Attributes:
504
            _task (Task): Optimization task with evaluation function.
505
            Lower (numpy.ndarray): Scaled lower limit of search space.
506
            Upper (numpy.ndarray): Scaled upper limit of search space.
507
508
    See Also:
509
            * :class:`NiaPy.util.Task`
510
511
    """
512
513
    def __init__(self, task, Lower, Upper, **kwargs):
514
        r"""Initialize scaled task.
515
516
        Args:
517
                task (Task): Optimization task to scale to new bounds.
518
                Lower (Union[float, int, numpy.ndarray]): New lower bounds.
519
                Upper (Union[float, int, numpy.ndarray]): New upper bounds.
520
                **kwargs (Dict[str, Any]): Additional arguments.
521
522
        See Also:
523
                * :func:`NiaPy.util.fullArray`
524
525
        """
526
527
        Task.__init__(self)
528
        self._task = task
529
        self.D = self._task.D
530
        self.Lower, self.Upper = fullArray(Lower, self.D), fullArray(Upper, self.D)
531
        self.bRange = fabs(Upper - Lower)
532
533
    def stopCond(self):
534
        r"""Test for stopping condition.
535
536
        This function uses `self._task` for checking the stopping criteria.
537
538
        Returns:
539
                bool: `True` if stopping condition is meet else `False`.
540
541
        """
542
543
        return self._task.stopCond()
544
545
    def stopCondI(self):
546
        r"""Test for stopping condition and increments the number of algorithm generations/iterations.
547
548
        This function uses `self._task` for checking the stopping criteria.
549
550
        Returns:
551
                bool: `True` if stopping condition is meet else `False`.
552
553
        """
554
555
        return self._task.stopCondI()
556
557
    def eval(self, A):
558
        r"""Evaluate solution.
559
560
        Args:
561
                A (numpy.ndarray): Solution for calculating function/fitness value.
562
563
        Returns:
564
                float: Function values of solution.
565
566
        """
567
568
        return self._task.eval(A)
569
570
    def evals(self):
571
        r"""Get the number of function evaluations.
572
573
        Returns:
574
                int: Number of function evaluations.
575
576
        """
577
578
        return self._task.evals()
579
580
    def iters(self):
581
        r"""Get the number of algorithms generations/iterations.
582
583
        Returns:
584
                int: Number of algorithms generations/iterations.
585
586
        """
587
588
        return self._task.iters()
589
590
    def nextIter(self):
591
        r"""Increment the number of iterations/generations.
592
593
        Function uses `self._task` to increment number of generations/iterations.
594
595
        """
596
597
        self._task.nextIter()
598
599
600
class TaskConvPrint(StoppingTask):
601
    r"""Task class with printing out new global best solutions found.
602
603
    Attributes:
604
            xb (numpy.ndarray): Global best solution.
605
            xb_f (float): Global best function/fitness values.
606
607
    See Also:
608
            * :class:`NiaPy.util.StoppingTask`
609
610
    """
611
612
    def __init__(self, **kwargs):
613
        r"""Initialize TaskConvPrint class.
614
615
        Args:
616
                **kwargs (Dict[str, Any]): Additional arguments.
617
618
        See Also:
619
                * :func:`NiaPy.util.StoppingTask.__init__`
620
621
        """
622
623
        StoppingTask.__init__(self, **kwargs)
624
        self.xb, self.xb_f = None, inf
625
626
    def eval(self, A):
627
        r"""Evaluate solution.
628
629
        Args:
630
                A (nupy.ndarray): Solution to evaluate.
631
632
        Returns:
633
                float: Function/Fitness values of solution.
634
635
        See Also:
636
                * :func:`NiaPy.util.StoppingTask.eval`
637
638
        """
639
640
        x_f = StoppingTask.eval(self, A)
641
        if self.x_f != self.xb_f:
642
            self.xb, self.xb_f = A, x_f
643
            logger.info('nFES:%d nGEN:%d => %s -> %s' % (self.Evals, self.Iters, self.xb, self.xb_f * self.optType.value))
644
        return x_f
645
646
647
class TaskConvSave(StoppingTask):
648
    r"""Task class with logging of function evaluations need to reach some function vale.
649
650
    Attributes:
651
            evals (List[int]): List of ints representing when the new global best was found.
652
            x_f_vals (List[float]): List of floats representing function/fitness values found.
653
654
    See Also:
655
            * :class:`NiaPy.util.StoppingTask`
656
657
    """
658
659
    def __init__(self, **kwargs):
660
        r"""Initialize TaskConvSave class.
661
662
        Args:
663
                **kwargs (Dict[str, Any]): Additional arguments.
664
665
        See Also:
666
                * :func:`NiaPy.util.StoppingTask.__init__`
667
668
        """
669
670
        StoppingTask.__init__(self, **kwargs)
671
        self.evals = []
672
        self.x_f_vals = []
673
674
    def eval(self, A):
675
        r"""Evaluate solution.
676
677
        Args:
678
                A (numpy.ndarray): Individual/solution to evaluate.
679
680
        Returns:
681
                float: Function/fitness values of individual.
682
683
        See Also:
684
                * :func:`SNiaPy.util.toppingTask.eval`
685
686
        """
687
688
        x_f = StoppingTask.eval(self, A)
689
        if x_f <= self.x_f:
690
            self.evals.append(self.Evals)
691
            self.x_f_vals.append(x_f)
692
        return x_f
693
694
    def return_conv(self):
695
        r"""Get values of x and y axis for plotting covariance graph.
696
697
        Returns:
698
                Tuple[List[int], List[float]]:
699
                        1. List of ints of function evaluations.
700
                        2. List of ints of function/fitness values.
701
702
        """
703
704
        return self.evals, self.x_f_vals
705
706
707
class TaskConvPlot(TaskConvSave):
708
    r"""Task class with ability of showing convergence graph.
709
710
    Attributes:
711
            iters (List[int]): List of ints representing when the new global best was found.
712
            x_fs (List[float]): List of floats representing function/fitness values found.
713
714
    See Also:
715
            * :class:`NiaPy.util.StoppingTask`
716
717
    """
718
719
    def __init__(self, **kwargs):
720
        r"""TODO.
721
722
        Args:
723
                **kwargs (Dict[str, Any]): Additional arguments.
724
725
        See Also:
726
                * :func:`NiaPy.util.StoppingTask.__init__`
727
728
        """
729
730
        StoppingTask.__init__(self, **kwargs)
731
        self.fig = plt.figure()
732
        self.ax = self.fig.subplots(nrows=1, ncols=1)
733
        self.ax.set_xlim(0, self.nFES)
734
        self.line, = self.ax.plot(self.iters, self.x_fs, animated=True)
735
        self.ani = anim.FuncAnimation(self.fig, self.updatePlot, blit=True)
736
        self.showPlot()
737
738
    def eval(self, A):
739
        r"""Evaluate solution.
740
741
        Args:
742
                A (numpy.ndarray): Solution to evaluate.
743
744
        Returns:
745
                float: Fitness/function values of solution.
746
747
        """
748
749
        x_f = StoppingTask.eval(self, A)
750
        if not self.x_f_vals:
751
            self.x_f_vals.append(x_f)
752
        elif x_f < self.x_f_vals[-1]:
753
            self.x_f_vals.append(x_f)
754
        else:
755
            self.x_f_vals.append(self.x_f_vals[-1])
756
        self.evals.append(self.Evals)
757
        return x_f
758
759
    def showPlot(self):
760
        r"""Animation updating function."""
761
        plt.show(block=False)
762
        plt.pause(0.001)
763
764
    def updatePlot(self, frame):
765
        r"""Update mathplotlib figure.
766
767
        Args:
768
                frame (): TODO
769
770
        Returns:
771
                Tuple[List[float], Any]:
772
                        1. Line
773
774
        """
775
776
        if self.x_f_vals:
777
            max_fs, min_fs = self.x_f_vals[0], self.x_f_vals[-1]
778
            self.ax.set_ylim(min_fs + 1, max_fs + 1)
779
            self.line.set_data(self.evals, self.x_f_vals)
780
        return self.line,
781
782
783
class TaskComposition(MoveTask):
784
    """Task compostion."""
785
786
    def __init__(self, benchmarks=None, rho=None, lamb=None, bias=None, **kwargs):
787
        r"""Initialize of composite function problem.
788
789
        Arguments:
790
                benchmarks (List[Benchmark]): Optimization function to use in composition
791
                delta (numpy.ndarray[float]): TODO
792
                lamb (numpy.ndarray[float]): TODO
793
                bias (numpy.ndarray[float]): TODO
794
795
        See Also:
796
                * :func:`NiaPy.util.MoveTask.__init__`
797
798
        TODO:
799
                Class is a work in progress.
800
801
        """
802
803
        MoveTask.__init__(self, **kwargs)
804
805
    def eval(self, A):
806
        r"""TODO.
807
808
        Args:
809
                A:
810
811
        Returns:
812
                float:
813
814
        Todo:
815
                Usage of multiple functions on the same time
816
817
        """
818
819
        return inf
820