Passed
Pull Request — master (#206)
by Grega
01:38
created

NiaPy.task.Task.Task.__init__()   C

Complexity

Conditions 10

Size

Total Lines 47
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 47
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.Task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# encoding=utf8
2
3
"""The implementation of tasks."""
4
5
import logging
6
from enum import Enum
7
8
from matplotlib import pyplot as plt, animation as anim
9
from numpy import inf, random as rand, asarray
10
from numpy.core.multiarray import ndarray, dot
11
from numpy.core.umath import fabs
12
13
from NiaPy.util import (
14
    limit_repair,
15
    fullArray,
16
    FesException,
17
    GenException,
18
    RefException
19
)
20
from NiaPy.benchmarks.utility import Utility
21
22
__all__ = [
23
    "OptimizationType",
24
    "Task",
25
    "CountingTask",
26
    "StoppingTask",
27
    "MoveTask",
28
    "TaskComposition",
29
    "TaskConvPlot",
30
    "TaskConvPrint",
31
    "TaskConvSave",
32
    "ThrowingTask",
33
    "ScaledTask"
34
]
35
36
logging.basicConfig()
37
logger = logging.getLogger("NiaPy.task.Task")
38
logger.setLevel("INFO")
39
40
41
class OptimizationType(Enum):
42
    r"""Enum representing type of optimization.
43
44
    Attributes:
45
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
46
            MAXIMIZATION (int): Represents maximization problems.
47
48
    """
49
50
    MINIMIZATION = 1.0
51
    MAXIMIZATION = -1.0
52
53
54
class Task:
55
    r"""Class representing problem to solve with optimization.
56
57
    Date:
58
            2019
59
60
    Author:
61
            Klemen Berkovič
62
63
    Attributes:
64
            D (int): Dimension of the problem.
65
            Lower (numpy.ndarray): Lower bounds of the problem.
66
            Upper (numpy.ndarray): Upper bounds of the problem.
67
            bRange (numpy.ndarray): Search range between upper and lower limits.
68
            optType (OptimizationType): Optimization type to use.
69
70
    See Also:
71
            * :class:`NiaPy.util.Utility`
72
73
    """
74
75
    D = 0
76
    benchmark = None
77
    Lower, Upper, bRange = inf, inf, inf
78
    optType = OptimizationType.MINIMIZATION
79
80
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
81
        r"""Initialize task class for optimization.
82
83
        Arguments:
84
                D (Optional[int]): Number of dimensions.
85
                optType (Optional[OptimizationType]): Set the type of optimization.
86
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
87
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
88
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
89
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
90
91
        See Also:
92
                * `func`:NiaPy.util.Utility.__init__`
93
                * `func`:NiaPy.util.Utility.repair`
94
95
        """
96
97
        # dimension of the problem
98
        self.D = D
99
        # set optimization type
100
        self.optType = optType
101
        # set optimization function
102
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
103
104
        if self.benchmark is not None:
105
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
106
107
        # set Lower limits
108
        if Lower is not None:
109
            self.Lower = fullArray(Lower, self.D)
110
        elif Lower is None and benchmark is not None:
111
            self.Lower = fullArray(self.benchmark.Lower, self.D)
112
        else:
113
            self.Lower = fullArray(0, self.D)
114
115
        # set Upper limits
116
        if Upper is not None:
117
            self.Upper = fullArray(Upper, self.D)
118
        elif Upper is None and benchmark is not None:
119
            self.Upper = fullArray(self.benchmark.Upper, self.D)
120
        else:
121
            self.Upper = fullArray(0, self.D)
122
123
        # set range
124
        self.bRange = self.Upper - self.Lower
125
        # set repair function
126
        self.frepair = frepair
127
128
    def dim(self):
129
        r"""Get the number of dimensions.
130
131
        Returns:
132
                int: Dimension of problem optimizing.
133
134
        """
135
136
        return self.D
137
138
    def bcLower(self):
139
        r"""Get the array of lower bound constraint.
140
141
        Returns:
142
                numpy.ndarray: Lower bound.
143
144
        """
145
146
        return self.Lower
147
148
    def bcUpper(self):
149
        r"""Get the array of upper bound constraint.
150
151
        Returns:
152
                numpy.ndarray: Upper bound.
153
154
        """
155
156
        return self.Upper
157
158
    def bcRange(self):
159
        r"""Get the range of bound constraint.
160
161
        Returns:
162
                numpy.ndarray: Range between lower and upper bound.
163
164
        """
165
166
        return self.Upper - self.Lower
167
168
    def repair(self, x, rnd=rand):
169
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
170
171
        Arguments:
172
                x (numpy.ndarray): Solution to check and repair if needed.
173
                rnd (mtrand.RandomState): Random number generator.
174
175
        Returns:
176
                numpy.ndarray: Fixed solution.
177
178
        See Also:
179
                * :func:`NiaPy.util.limitRepair`
180
                * :func:`NiaPy.util.limitInversRepair`
181
                * :func:`NiaPy.util.wangRepair`
182
                * :func:`NiaPy.util.randRepair`
183
                * :func:`NiaPy.util.reflectRepair`
184
185
        """
186
187
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
188
189
    def nextIter(self):
190
        r"""Increments the number of algorithm iterations."""
191
192
    def start(self):
193
        r"""Start stopwatch."""
194
195
    def eval(self, A):
196
        r"""Evaluate the solution A.
197
198
        Arguments:
199
                A (numpy.ndarray): Solution to evaluate.
200
201
        Returns:
202
                float: Fitness/function values of solution.
203
204
        """
205
206
        return self.Fun(self.D, A) * self.optType.value
207
208
    def isFeasible(self, A):
209
        r"""Check if the solution is feasible.
210
211
        Arguments:
212
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
213
214
        Returns:
215
                bool: `True` if solution is in feasible space else `False`.
216
217
        """
218
219
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
220
221
    def stopCond(self):
222
        r"""Check if optimization task should stop.
223
224
        Returns:
225
                bool: `True` if stopping condition is meet else `False`.
226
227
        """
228
229
        return False
230
231
232
class CountingTask(Task):
233
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
234
235
    Attributes:
236
            Iters (int): Number of algorithm iterations/generations.
237
            Evals (int): Number of function evaluations.
238
239
    See Also:
240
            * :class:`NiaPy.util.Task`
241
242
    """
243
244
    def __init__(self, **kwargs):
245
        r"""Initialize counting task.
246
247
        Args:
248
                **kwargs (Dict[str, Any]): Additional arguments.
249
250
        See Also:
251
                * :func:`NiaPy.util.Task.__init__`
252
253
        """
254
255
        Task.__init__(self, **kwargs)
256
        self.Iters, self.Evals = 0, 0
257
258
    def eval(self, A):
259
        r"""Evaluate the solution A.
260
261
        This function increments function evaluation counter `self.Evals`.
262
263
        Arguments:
264
                A (numpy.ndarray): Solutions to evaluate.
265
266
        Returns:
267
                float: Fitness/function values of solution.
268
269
        See Also:
270
                * :func:`NiaPy.util.Task.eval`
271
272
        """
273
274
        r = Task.eval(self, A)
275
        self.Evals += 1
276
        return r
277
278
    def evals(self):
279
        r"""Get the number of evaluations made.
280
281
        Returns:
282
                int: Number of evaluations made.
283
284
        """
285
286
        return self.Evals
287
288
    def iters(self):
289
        r"""Get the number of algorithm iteratins made.
290
291
        Returns:
292
                int: Number of generations/iterations made by algorithm.
293
294
        """
295
296
        return self.Iters
297
298
    def nextIter(self):
299
        r"""Increases the number of algorithm iterations made.
300
301
        This function increments number of algorithm iterations/generations counter `self.Iters`.
302
303
        """
304
305
        self.Iters += 1
306
307
308
class StoppingTask(CountingTask):
309
    r"""Optimization task with implemented checking for stopping criterias.
310
311
    Attributes:
312
            nGEN (int): Maximum number of algorithm iterations/generations.
313
            nFES (int): Maximum number of function evaluations.
314
            refValue (float): Reference function/fitness values to reach in optimization.
315
            x (numpy.ndarray): Best found individual.
316
            x_f (float): Best found individual function/fitness value.
317
318
    See Also:
319
            * :class:`NiaPy.util.CountingTask`
320
321
    """
322
323
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, **kwargs):
324
        r"""Initialize task class for optimization.
325
326
        Arguments:
327
                nFES (Optional[int]): Number of function evaluations.
328
                nGEN (Optional[int]): Number of generations or iterations.
329
                refValue (Optional[float]): Reference value of function/fitness function.
330
331
        See Also:
332
                * :func:`NiaPy.util.CountingTask.__init__`
333
334
        """
335
336
        CountingTask.__init__(self, **kwargs)
337
        self.refValue = (-inf if refValue is None else refValue)
338
        self.x, self.x_f = None, inf
339
        self.nFES, self.nGEN = nFES, nGEN
340
341
    def eval(self, A):
342
        r"""Evaluate solution.
343
344
        Args:
345
                A (numpy.ndarray): Solution to evaluate.
346
347
        Returns:
348
                float: Fitness/function value of solution.
349
350
        See Also:
351
                * :func:`NiaPy.util.StoppingTask.stopCond`
352
                * :func:`NiaPy.util.CountingTask.eval`
353
354
        """
355
356
        if self.stopCond():
357
            return inf * self.optType.value
358
359
        x_f = CountingTask.eval(self, A)
360
361
        if x_f < self.x_f:
362
            self.x_f = x_f
363
364
        return x_f
365
366
    def stopCond(self):
367
        r"""Check if stopping condition reached.
368
369
        Returns:
370
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`
371
372
        """
373
374
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
375
376
    def stopCondI(self):
377
        r"""Check if stopping condition reached and increase number of iterations.
378
379
        Returns:
380
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
381
382
        See Also:
383
                * :func:`NiaPy.util.StoppingTask.stopCond`
384
                * :func:`NiaPy.util.CountingTask.nextIter`
385
386
        """
387
388
        r = self.stopCond()
389
        CountingTask.nextIter(self)
390
        return r
391
392
393
class ThrowingTask(StoppingTask):
394
    r"""Task that throw exceptions when stopping condition is meet.
395
396
    See Also:
397
            * :class:`NiaPy.util.StoppingTask`
398
399
    """
400
401
    def __init__(self, **kwargs):
402
        r"""Initialize optimization task.
403
404
        Args:
405
                **kwargs (Dict[str, Any]): Additional arguments.
406
407
        See Also:
408
                * :func:`NiaPy.util.StoppingTask.__init__`
409
410
        """
411
412
        StoppingTask.__init__(self, **kwargs)
413
414
    def stopCondE(self):
415
        r"""Throw exception for the given stopping condition.
416
417
        Raises:
418
                * FesException: Thrown when the number of function/fitness evaluations is reached.
419
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
420
                * RefException: Thrown when the reference values is reached.
421
                * TimeException: Thrown when algorithm exceeds time run limit.
422
423
        """
424
425
        # dtime = datetime.now() - self.startTime
426
        if self.Evals >= self.nFES:
427
            raise FesException()
428
        if self.Iters >= self.nGEN:
429
            raise GenException()
430
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
431
        if self.refValue >= self.x_f:
432
            raise RefException()
433
434
    def eval(self, A):
435
        r"""Evaluate solution.
436
437
        Args:
438
                A (numpy.ndarray): Solution to evaluate.
439
440
        Returns:
441
                float: Function/fitness values of solution.
442
443
        See Also:
444
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
445
                * :func:`NiaPy.util.StoppingTask.eval`
446
447
        """
448
449
        self.stopCondE()
450
        return StoppingTask.eval(self, A)
451
452
453
class MoveTask(StoppingTask):
454
    """Move task implementation."""
455
456
    def __init__(self, o=None, fo=None, M=None, fM=None, optF=None, **kwargs):
457
        r"""Initialize task class for optimization.
458
459
        Arguments:
460
                o (numpy.ndarray[Union[float, int]]): Array for shifting.
461
                of (Callable[numpy.ndarray[Union[float, int]]]): Function applied on shifted input.
462
                M (numpy.ndarray[Union[float, int]]): Matrix for rotating.
463
                fM (Callable[numpy.ndarray[Union[float, int]]]): Function applied after rotating.
464
465
        See Also:
466
                * :func:`NiaPy.util.StoppingTask.__init__`
467
468
        """
469
470
        StoppingTask.__init__(self, **kwargs)
471
        self.o = o if isinstance(o, ndarray) or o is None else asarray(o)
472
        self.M = M if isinstance(M, ndarray) or M is None else asarray(M)
473
        self.fo, self.fM, self.optF = fo, fM, optF
474
475
    def eval(self, A):
476
        r"""Evaluate the solution.
477
478
        Args:
479
                A (numpy.ndarray): Solution to evaluate
480
481
        Returns:
482
                float: Fitness/function value of solution.
483
484
        See Also:
485
                * :func:`NiaPy.util.StoppingTask.stopCond`
486
                * :func:`NiaPy.util.StoppingTask.eval`
487
488
        """
489
490
        if self.stopCond():
491
            return inf * self.optType.value
492
        X = A - self.o if self.o is not None else A
493
        X = self.fo(X) if self.fo is not None else X
494
        X = dot(X, self.M) if self.M is not None else X
495
        X = self.fM(X) if self.fM is not None else X
496
        r = StoppingTask.eval(self, X) + (self.optF if self.optF is not None else 0)
497
        if r <= self.x_f:
498
            self.x, self.x_f = A, r
499
        return r
500
501
502
class ScaledTask(Task):
503
    r"""Scaled task.
504
505
    Attributes:
506
            _task (Task): Optimization task with evaluation function.
507
            Lower (numpy.ndarray): Scaled lower limit of search space.
508
            Upper (numpy.ndarray): Scaled upper limit of search space.
509
510
    See Also:
511
            * :class:`NiaPy.util.Task`
512
513
    """
514
515
    def __init__(self, task, Lower, Upper, **kwargs):
516
        r"""Initialize scaled task.
517
518
        Args:
519
                task (Task): Optimization task to scale to new bounds.
520
                Lower (Union[float, int, numpy.ndarray]): New lower bounds.
521
                Upper (Union[float, int, numpy.ndarray]): New upper bounds.
522
                **kwargs (Dict[str, Any]): Additional arguments.
523
524
        See Also:
525
                * :func:`NiaPy.util.fullArray`
526
527
        """
528
529
        Task.__init__(self)
530
        self._task = task
531
        self.D = self._task.D
532
        self.Lower, self.Upper = fullArray(Lower, self.D), fullArray(Upper, self.D)
533
        self.bRange = fabs(Upper - Lower)
534
535
    def stopCond(self):
536
        r"""Test for stopping condition.
537
538
        This function uses `self._task` for checking the stopping criteria.
539
540
        Returns:
541
                bool: `True` if stopping condition is meet else `False`.
542
543
        """
544
545
        return self._task.stopCond()
546
547
    def stopCondI(self):
548
        r"""Test for stopping condition and increments the number of algorithm generations/iterations.
549
550
        This function uses `self._task` for checking the stopping criteria.
551
552
        Returns:
553
                bool: `True` if stopping condition is meet else `False`.
554
555
        """
556
557
        return self._task.stopCondI()
558
559
    def eval(self, A):
560
        r"""Evaluate solution.
561
562
        Args:
563
                A (numpy.ndarray): Solution for calculating function/fitness value.
564
565
        Returns:
566
                float: Function values of solution.
567
568
        """
569
570
        return self._task.eval(A)
571
572
    def evals(self):
573
        r"""Get the number of function evaluations.
574
575
        Returns:
576
                int: Number of function evaluations.
577
578
        """
579
580
        return self._task.evals()
581
582
    def iters(self):
583
        r"""Get the number of algorithms generations/iterations.
584
585
        Returns:
586
                int: Number of algorithms generations/iterations.
587
588
        """
589
590
        return self._task.iters()
591
592
    def nextIter(self):
593
        r"""Increment the number of iterations/generations.
594
595
        Function uses `self._task` to increment number of generations/iterations.
596
597
        """
598
599
        self._task.nextIter()
600
601
602
class TaskConvPrint(StoppingTask):
603
    r"""Task class with printing out new global best solutions found.
604
605
    Attributes:
606
            xb (numpy.ndarray): Global best solution.
607
            xb_f (float): Global best function/fitness values.
608
609
    See Also:
610
            * :class:`NiaPy.util.StoppingTask`
611
612
    """
613
614
    def __init__(self, **kwargs):
615
        r"""Initialize TaskConvPrint class.
616
617
        Args:
618
                **kwargs (Dict[str, Any]): Additional arguments.
619
620
        See Also:
621
                * :func:`NiaPy.util.StoppingTask.__init__`
622
623
        """
624
625
        StoppingTask.__init__(self, **kwargs)
626
        self.xb, self.xb_f = None, inf
627
628
    def eval(self, A):
629
        r"""Evaluate solution.
630
631
        Args:
632
                A (nupy.ndarray): Solution to evaluate.
633
634
        Returns:
635
                float: Function/Fitness values of solution.
636
637
        See Also:
638
                * :func:`NiaPy.util.StoppingTask.eval`
639
640
        """
641
642
        x_f = StoppingTask.eval(self, A)
643
        if self.x_f != self.xb_f:
644
            self.xb, self.xb_f = A, x_f
645
            logger.info('nFES:%d nGEN:%d => %s -> %s' % (self.Evals, self.Iters, self.xb, self.xb_f * self.optType.value))
646
        return x_f
647
648
649
class TaskConvSave(StoppingTask):
650
    r"""Task class with logging of function evaluations need to reach some function vale.
651
652
    Attributes:
653
            evals (List[int]): List of ints representing when the new global best was found.
654
            x_f_vals (List[float]): List of floats representing function/fitness values found.
655
656
    See Also:
657
            * :class:`NiaPy.util.StoppingTask`
658
659
    """
660
661
    def __init__(self, **kwargs):
662
        r"""Initialize TaskConvSave class.
663
664
        Args:
665
                **kwargs (Dict[str, Any]): Additional arguments.
666
667
        See Also:
668
                * :func:`NiaPy.util.StoppingTask.__init__`
669
670
        """
671
672
        StoppingTask.__init__(self, **kwargs)
673
        self.evals = []
674
        self.x_f_vals = []
675
676
    def eval(self, A):
677
        r"""Evaluate solution.
678
679
        Args:
680
                A (numpy.ndarray): Individual/solution to evaluate.
681
682
        Returns:
683
                float: Function/fitness values of individual.
684
685
        See Also:
686
                * :func:`SNiaPy.util.toppingTask.eval`
687
688
        """
689
690
        x_f = StoppingTask.eval(self, A)
691
        if x_f <= self.x_f:
692
            self.evals.append(self.Evals)
693
            self.x_f_vals.append(x_f)
694
        return x_f
695
696
    def return_conv(self):
697
        r"""Get values of x and y axis for plotting covariance graph.
698
699
        Returns:
700
                Tuple[List[int], List[float]]:
701
                        1. List of ints of function evaluations.
702
                        2. List of ints of function/fitness values.
703
704
        """
705
706
        return self.evals, self.x_f_vals
707
708
709
class TaskConvPlot(TaskConvSave):
710
    r"""Task class with ability of showing convergence graph.
711
712
    Attributes:
713
            iters (List[int]): List of ints representing when the new global best was found.
714
            x_fs (List[float]): List of floats representing function/fitness values found.
715
716
    See Also:
717
            * :class:`NiaPy.util.StoppingTask`
718
719
    """
720
721
    def __init__(self, **kwargs):
722
        r"""TODO.
723
724
        Args:
725
                **kwargs (Dict[str, Any]): Additional arguments.
726
727
        See Also:
728
                * :func:`NiaPy.util.StoppingTask.__init__`
729
730
        """
731
732
        StoppingTask.__init__(self, **kwargs)
733
        self.fig = plt.figure()
734
        self.ax = self.fig.subplots(nrows=1, ncols=1)
735
        self.ax.set_xlim(0, self.nFES)
736
        self.line, = self.ax.plot(self.iters, self.x_fs, animated=True)
737
        self.ani = anim.FuncAnimation(self.fig, self.updatePlot, blit=True)
738
        self.showPlot()
739
740
    def eval(self, A):
741
        r"""Evaluate solution.
742
743
        Args:
744
                A (numpy.ndarray): Solution to evaluate.
745
746
        Returns:
747
                float: Fitness/function values of solution.
748
749
        """
750
751
        x_f = StoppingTask.eval(self, A)
752
        if not self.x_f_vals:
753
            self.x_f_vals.append(x_f)
754
        elif x_f < self.x_f_vals[-1]:
755
            self.x_f_vals.append(x_f)
756
        else:
757
            self.x_f_vals.append(self.x_f_vals[-1])
758
        self.evals.append(self.Evals)
759
        return x_f
760
761
    def showPlot(self):
762
        r"""Animation updating function."""
763
        plt.show(block=False)
764
        plt.pause(0.001)
765
766
    def updatePlot(self, frame):
767
        r"""Update mathplotlib figure.
768
769
        Args:
770
                frame (): TODO
771
772
        Returns:
773
                Tuple[List[float], Any]:
774
                        1. Line
775
776
        """
777
778
        if self.x_f_vals:
779
            max_fs, min_fs = self.x_f_vals[0], self.x_f_vals[-1]
780
            self.ax.set_ylim(min_fs + 1, max_fs + 1)
781
            self.line.set_data(self.evals, self.x_f_vals)
782
        return self.line,
783
784
785
class TaskComposition(MoveTask):
786
    """Task compostion."""
787
788
    def __init__(self, benchmarks=None, rho=None, lamb=None, bias=None, **kwargs):
789
        r"""Initialize of composite function problem.
790
791
        Arguments:
792
                benchmarks (List[Benchmark]): Optimization function to use in composition
793
                delta (numpy.ndarray[float]): TODO
794
                lamb (numpy.ndarray[float]): TODO
795
                bias (numpy.ndarray[float]): TODO
796
797
        See Also:
798
                * :func:`NiaPy.util.MoveTask.__init__`
799
800
        TODO:
801
                Class is a work in progress.
802
803
        """
804
805
        MoveTask.__init__(self, **kwargs)
806
807
    def eval(self, A):
808
        r"""TODO.
809
810
        Args:
811
                A:
812
813
        Returns:
814
                float:
815
816
        Todo:
817
                Usage of multiple functions on the same time
818
819
        """
820
821
        return inf
822