Passed
Pull Request — master (#206)
by Grega
01:10
created

NiaPy.task.task.Task.__init__()   C

Complexity

Conditions 10

Size

Total Lines 47
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 47
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# encoding=utf8
2
3
"""The implementation of tasks."""
4
5
import logging
6
from enum import Enum
7
8
from matplotlib import pyplot as plt, animation as anim
9
from numpy import inf, random as rand, asarray
10
from numpy.core.multiarray import ndarray, dot
11
from numpy.core.umath import fabs
12
13
from NiaPy.util import (
14
    limit_repair,
15
    fullArray,
16
    FesException,
17
    GenException,
18
    RefException
19
)
20
from NiaPy.benchmarks.utility import Utility
21
22
23
logging.basicConfig()
24
logger = logging.getLogger("NiaPy.task.Task")
25
logger.setLevel("INFO")
26
27
28
class OptimizationType(Enum):
29
    r"""Enum representing type of optimization.
30
31
    Attributes:
32
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
33
            MAXIMIZATION (int): Represents maximization problems.
34
35
    """
36
37
    MINIMIZATION = 1.0
38
    MAXIMIZATION = -1.0
39
40
41
class Task:
42
    r"""Class representing problem to solve with optimization.
43
44
    Date:
45
            2019
46
47
    Author:
48
            Klemen Berkovič
49
50
    Attributes:
51
            D (int): Dimension of the problem.
52
            Lower (numpy.ndarray): Lower bounds of the problem.
53
            Upper (numpy.ndarray): Upper bounds of the problem.
54
            bRange (numpy.ndarray): Search range between upper and lower limits.
55
            optType (OptimizationType): Optimization type to use.
56
57
    See Also:
58
            * :class:`NiaPy.util.Utility`
59
60
    """
61
62
    D = 0
63
    benchmark = None
64
    Lower, Upper, bRange = inf, inf, inf
65
    optType = OptimizationType.MINIMIZATION
66
67
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
68
        r"""Initialize task class for optimization.
69
70
        Arguments:
71
                D (Optional[int]): Number of dimensions.
72
                optType (Optional[OptimizationType]): Set the type of optimization.
73
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
74
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
75
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
76
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
77
78
        See Also:
79
                * `func`:NiaPy.util.Utility.__init__`
80
                * `func`:NiaPy.util.Utility.repair`
81
82
        """
83
84
        # dimension of the problem
85
        self.D = D
86
        # set optimization type
87
        self.optType = optType
88
        # set optimization function
89
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
90
91
        if self.benchmark is not None:
92
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
93
94
        # set Lower limits
95
        if Lower is not None:
96
            self.Lower = fullArray(Lower, self.D)
97
        elif Lower is None and benchmark is not None:
98
            self.Lower = fullArray(self.benchmark.Lower, self.D)
99
        else:
100
            self.Lower = fullArray(0, self.D)
101
102
        # set Upper limits
103
        if Upper is not None:
104
            self.Upper = fullArray(Upper, self.D)
105
        elif Upper is None and benchmark is not None:
106
            self.Upper = fullArray(self.benchmark.Upper, self.D)
107
        else:
108
            self.Upper = fullArray(0, self.D)
109
110
        # set range
111
        self.bRange = self.Upper - self.Lower
112
        # set repair function
113
        self.frepair = frepair
114
115
    def dim(self):
116
        r"""Get the number of dimensions.
117
118
        Returns:
119
                int: Dimension of problem optimizing.
120
121
        """
122
123
        return self.D
124
125
    def bcLower(self):
126
        r"""Get the array of lower bound constraint.
127
128
        Returns:
129
                numpy.ndarray: Lower bound.
130
131
        """
132
133
        return self.Lower
134
135
    def bcUpper(self):
136
        r"""Get the array of upper bound constraint.
137
138
        Returns:
139
                numpy.ndarray: Upper bound.
140
141
        """
142
143
        return self.Upper
144
145
    def bcRange(self):
146
        r"""Get the range of bound constraint.
147
148
        Returns:
149
                numpy.ndarray: Range between lower and upper bound.
150
151
        """
152
153
        return self.Upper - self.Lower
154
155
    def repair(self, x, rnd=rand):
156
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
157
158
        Arguments:
159
                x (numpy.ndarray): Solution to check and repair if needed.
160
                rnd (mtrand.RandomState): Random number generator.
161
162
        Returns:
163
                numpy.ndarray: Fixed solution.
164
165
        See Also:
166
                * :func:`NiaPy.util.limitRepair`
167
                * :func:`NiaPy.util.limitInversRepair`
168
                * :func:`NiaPy.util.wangRepair`
169
                * :func:`NiaPy.util.randRepair`
170
                * :func:`NiaPy.util.reflectRepair`
171
172
        """
173
174
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
175
176
    def nextIter(self):
177
        r"""Increments the number of algorithm iterations."""
178
179
    def start(self):
180
        r"""Start stopwatch."""
181
182
    def eval(self, A):
183
        r"""Evaluate the solution A.
184
185
        Arguments:
186
                A (numpy.ndarray): Solution to evaluate.
187
188
        Returns:
189
                float: Fitness/function values of solution.
190
191
        """
192
193
        return self.Fun(self.D, A) * self.optType.value
194
195
    def isFeasible(self, A):
196
        r"""Check if the solution is feasible.
197
198
        Arguments:
199
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
200
201
        Returns:
202
                bool: `True` if solution is in feasible space else `False`.
203
204
        """
205
206
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
207
208
    def stopCond(self):
209
        r"""Check if optimization task should stop.
210
211
        Returns:
212
                bool: `True` if stopping condition is meet else `False`.
213
214
        """
215
216
        return False
217
218
219
class CountingTask(Task):
220
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
221
222
    Attributes:
223
            Iters (int): Number of algorithm iterations/generations.
224
            Evals (int): Number of function evaluations.
225
226
    See Also:
227
            * :class:`NiaPy.util.Task`
228
229
    """
230
231
    def __init__(self, **kwargs):
232
        r"""Initialize counting task.
233
234
        Args:
235
                **kwargs (Dict[str, Any]): Additional arguments.
236
237
        See Also:
238
                * :func:`NiaPy.util.Task.__init__`
239
240
        """
241
242
        Task.__init__(self, **kwargs)
243
        self.Iters, self.Evals = 0, 0
244
245
    def eval(self, A):
246
        r"""Evaluate the solution A.
247
248
        This function increments function evaluation counter `self.Evals`.
249
250
        Arguments:
251
                A (numpy.ndarray): Solutions to evaluate.
252
253
        Returns:
254
                float: Fitness/function values of solution.
255
256
        See Also:
257
                * :func:`NiaPy.util.Task.eval`
258
259
        """
260
261
        r = Task.eval(self, A)
262
        self.Evals += 1
263
        return r
264
265
    def evals(self):
266
        r"""Get the number of evaluations made.
267
268
        Returns:
269
                int: Number of evaluations made.
270
271
        """
272
273
        return self.Evals
274
275
    def iters(self):
276
        r"""Get the number of algorithm iteratins made.
277
278
        Returns:
279
                int: Number of generations/iterations made by algorithm.
280
281
        """
282
283
        return self.Iters
284
285
    def nextIter(self):
286
        r"""Increases the number of algorithm iterations made.
287
288
        This function increments number of algorithm iterations/generations counter `self.Iters`.
289
290
        """
291
292
        self.Iters += 1
293
294
295
class StoppingTask(CountingTask):
296
    r"""Optimization task with implemented checking for stopping criterias.
297
298
    Attributes:
299
            nGEN (int): Maximum number of algorithm iterations/generations.
300
            nFES (int): Maximum number of function evaluations.
301
            refValue (float): Reference function/fitness values to reach in optimization.
302
            x (numpy.ndarray): Best found individual.
303
            x_f (float): Best found individual function/fitness value.
304
305
    See Also:
306
            * :class:`NiaPy.util.CountingTask`
307
308
    """
309
310
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, **kwargs):
311
        r"""Initialize task class for optimization.
312
313
        Arguments:
314
                nFES (Optional[int]): Number of function evaluations.
315
                nGEN (Optional[int]): Number of generations or iterations.
316
                refValue (Optional[float]): Reference value of function/fitness function.
317
318
        See Also:
319
                * :func:`NiaPy.util.CountingTask.__init__`
320
321
        """
322
323
        CountingTask.__init__(self, **kwargs)
324
        self.refValue = (-inf if refValue is None else refValue)
325
        self.x, self.x_f = None, inf
326
        self.nFES, self.nGEN = nFES, nGEN
327
328
    def eval(self, A):
329
        r"""Evaluate solution.
330
331
        Args:
332
                A (numpy.ndarray): Solution to evaluate.
333
334
        Returns:
335
                float: Fitness/function value of solution.
336
337
        See Also:
338
                * :func:`NiaPy.util.StoppingTask.stopCond`
339
                * :func:`NiaPy.util.CountingTask.eval`
340
341
        """
342
343
        if self.stopCond():
344
            return inf * self.optType.value
345
346
        x_f = CountingTask.eval(self, A)
347
348
        if x_f < self.x_f:
349
            self.x_f = x_f
350
351
        return x_f
352
353
    def stopCond(self):
354
        r"""Check if stopping condition reached.
355
356
        Returns:
357
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`
358
359
        """
360
361
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
362
363
    def stopCondI(self):
364
        r"""Check if stopping condition reached and increase number of iterations.
365
366
        Returns:
367
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
368
369
        See Also:
370
                * :func:`NiaPy.util.StoppingTask.stopCond`
371
                * :func:`NiaPy.util.CountingTask.nextIter`
372
373
        """
374
375
        r = self.stopCond()
376
        CountingTask.nextIter(self)
377
        return r
378
379
380
class ThrowingTask(StoppingTask):
381
    r"""Task that throw exceptions when stopping condition is meet.
382
383
    See Also:
384
            * :class:`NiaPy.util.StoppingTask`
385
386
    """
387
388
    def __init__(self, **kwargs):
389
        r"""Initialize optimization task.
390
391
        Args:
392
                **kwargs (Dict[str, Any]): Additional arguments.
393
394
        See Also:
395
                * :func:`NiaPy.util.StoppingTask.__init__`
396
397
        """
398
399
        StoppingTask.__init__(self, **kwargs)
400
401
    def stopCondE(self):
402
        r"""Throw exception for the given stopping condition.
403
404
        Raises:
405
                * FesException: Thrown when the number of function/fitness evaluations is reached.
406
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
407
                * RefException: Thrown when the reference values is reached.
408
                * TimeException: Thrown when algorithm exceeds time run limit.
409
410
        """
411
412
        # dtime = datetime.now() - self.startTime
413
        if self.Evals >= self.nFES:
414
            raise FesException()
415
        if self.Iters >= self.nGEN:
416
            raise GenException()
417
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
418
        if self.refValue >= self.x_f:
419
            raise RefException()
420
421
    def eval(self, A):
422
        r"""Evaluate solution.
423
424
        Args:
425
                A (numpy.ndarray): Solution to evaluate.
426
427
        Returns:
428
                float: Function/fitness values of solution.
429
430
        See Also:
431
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
432
                * :func:`NiaPy.util.StoppingTask.eval`
433
434
        """
435
436
        self.stopCondE()
437
        return StoppingTask.eval(self, A)
438
439
440
class MoveTask(StoppingTask):
441
    """Move task implementation."""
442
443
    def __init__(self, o=None, fo=None, M=None, fM=None, optF=None, **kwargs):
444
        r"""Initialize task class for optimization.
445
446
        Arguments:
447
                o (numpy.ndarray[Union[float, int]]): Array for shifting.
448
                of (Callable[numpy.ndarray[Union[float, int]]]): Function applied on shifted input.
449
                M (numpy.ndarray[Union[float, int]]): Matrix for rotating.
450
                fM (Callable[numpy.ndarray[Union[float, int]]]): Function applied after rotating.
451
452
        See Also:
453
                * :func:`NiaPy.util.StoppingTask.__init__`
454
455
        """
456
457
        StoppingTask.__init__(self, **kwargs)
458
        self.o = o if isinstance(o, ndarray) or o is None else asarray(o)
459
        self.M = M if isinstance(M, ndarray) or M is None else asarray(M)
460
        self.fo, self.fM, self.optF = fo, fM, optF
461
462
    def eval(self, A):
463
        r"""Evaluate the solution.
464
465
        Args:
466
                A (numpy.ndarray): Solution to evaluate
467
468
        Returns:
469
                float: Fitness/function value of solution.
470
471
        See Also:
472
                * :func:`NiaPy.util.StoppingTask.stopCond`
473
                * :func:`NiaPy.util.StoppingTask.eval`
474
475
        """
476
477
        if self.stopCond():
478
            return inf * self.optType.value
479
        X = A - self.o if self.o is not None else A
480
        X = self.fo(X) if self.fo is not None else X
481
        X = dot(X, self.M) if self.M is not None else X
482
        X = self.fM(X) if self.fM is not None else X
483
        r = StoppingTask.eval(self, X) + (self.optF if self.optF is not None else 0)
484
        if r <= self.x_f:
485
            self.x, self.x_f = A, r
486
        return r
487
488
489
class ScaledTask(Task):
490
    r"""Scaled task.
491
492
    Attributes:
493
            _task (Task): Optimization task with evaluation function.
494
            Lower (numpy.ndarray): Scaled lower limit of search space.
495
            Upper (numpy.ndarray): Scaled upper limit of search space.
496
497
    See Also:
498
            * :class:`NiaPy.util.Task`
499
500
    """
501
502
    def __init__(self, task, Lower, Upper, **kwargs):
503
        r"""Initialize scaled task.
504
505
        Args:
506
                task (Task): Optimization task to scale to new bounds.
507
                Lower (Union[float, int, numpy.ndarray]): New lower bounds.
508
                Upper (Union[float, int, numpy.ndarray]): New upper bounds.
509
                **kwargs (Dict[str, Any]): Additional arguments.
510
511
        See Also:
512
                * :func:`NiaPy.util.fullArray`
513
514
        """
515
516
        Task.__init__(self)
517
        self._task = task
518
        self.D = self._task.D
519
        self.Lower, self.Upper = fullArray(Lower, self.D), fullArray(Upper, self.D)
520
        self.bRange = fabs(Upper - Lower)
521
522
    def stopCond(self):
523
        r"""Test for stopping condition.
524
525
        This function uses `self._task` for checking the stopping criteria.
526
527
        Returns:
528
                bool: `True` if stopping condition is meet else `False`.
529
530
        """
531
532
        return self._task.stopCond()
533
534
    def stopCondI(self):
535
        r"""Test for stopping condition and increments the number of algorithm generations/iterations.
536
537
        This function uses `self._task` for checking the stopping criteria.
538
539
        Returns:
540
                bool: `True` if stopping condition is meet else `False`.
541
542
        """
543
544
        return self._task.stopCondI()
545
546
    def eval(self, A):
547
        r"""Evaluate solution.
548
549
        Args:
550
                A (numpy.ndarray): Solution for calculating function/fitness value.
551
552
        Returns:
553
                float: Function values of solution.
554
555
        """
556
557
        return self._task.eval(A)
558
559
    def evals(self):
560
        r"""Get the number of function evaluations.
561
562
        Returns:
563
                int: Number of function evaluations.
564
565
        """
566
567
        return self._task.evals()
568
569
    def iters(self):
570
        r"""Get the number of algorithms generations/iterations.
571
572
        Returns:
573
                int: Number of algorithms generations/iterations.
574
575
        """
576
577
        return self._task.iters()
578
579
    def nextIter(self):
580
        r"""Increment the number of iterations/generations.
581
582
        Function uses `self._task` to increment number of generations/iterations.
583
584
        """
585
586
        self._task.nextIter()
587
588
589
class TaskConvPrint(StoppingTask):
590
    r"""Task class with printing out new global best solutions found.
591
592
    Attributes:
593
            xb (numpy.ndarray): Global best solution.
594
            xb_f (float): Global best function/fitness values.
595
596
    See Also:
597
            * :class:`NiaPy.util.StoppingTask`
598
599
    """
600
601
    def __init__(self, **kwargs):
602
        r"""Initialize TaskConvPrint class.
603
604
        Args:
605
                **kwargs (Dict[str, Any]): Additional arguments.
606
607
        See Also:
608
                * :func:`NiaPy.util.StoppingTask.__init__`
609
610
        """
611
612
        StoppingTask.__init__(self, **kwargs)
613
        self.xb, self.xb_f = None, inf
614
615
    def eval(self, A):
616
        r"""Evaluate solution.
617
618
        Args:
619
                A (nupy.ndarray): Solution to evaluate.
620
621
        Returns:
622
                float: Function/Fitness values of solution.
623
624
        See Also:
625
                * :func:`NiaPy.util.StoppingTask.eval`
626
627
        """
628
629
        x_f = StoppingTask.eval(self, A)
630
        if self.x_f != self.xb_f:
631
            self.xb, self.xb_f = A, x_f
632
            logger.info('nFES:%d nGEN:%d => %s -> %s' % (self.Evals, self.Iters, self.xb, self.xb_f * self.optType.value))
633
        return x_f
634
635
636
class TaskConvSave(StoppingTask):
637
    r"""Task class with logging of function evaluations need to reach some function vale.
638
639
    Attributes:
640
            evals (List[int]): List of ints representing when the new global best was found.
641
            x_f_vals (List[float]): List of floats representing function/fitness values found.
642
643
    See Also:
644
            * :class:`NiaPy.util.StoppingTask`
645
646
    """
647
648
    def __init__(self, **kwargs):
649
        r"""Initialize TaskConvSave class.
650
651
        Args:
652
                **kwargs (Dict[str, Any]): Additional arguments.
653
654
        See Also:
655
                * :func:`NiaPy.util.StoppingTask.__init__`
656
657
        """
658
659
        StoppingTask.__init__(self, **kwargs)
660
        self.evals = []
661
        self.x_f_vals = []
662
663
    def eval(self, A):
664
        r"""Evaluate solution.
665
666
        Args:
667
                A (numpy.ndarray): Individual/solution to evaluate.
668
669
        Returns:
670
                float: Function/fitness values of individual.
671
672
        See Also:
673
                * :func:`SNiaPy.util.toppingTask.eval`
674
675
        """
676
677
        x_f = StoppingTask.eval(self, A)
678
        if x_f <= self.x_f:
679
            self.evals.append(self.Evals)
680
            self.x_f_vals.append(x_f)
681
        return x_f
682
683
    def return_conv(self):
684
        r"""Get values of x and y axis for plotting covariance graph.
685
686
        Returns:
687
                Tuple[List[int], List[float]]:
688
                        1. List of ints of function evaluations.
689
                        2. List of ints of function/fitness values.
690
691
        """
692
693
        return self.evals, self.x_f_vals
694
695
696
class TaskConvPlot(TaskConvSave):
697
    r"""Task class with ability of showing convergence graph.
698
699
    Attributes:
700
            iters (List[int]): List of ints representing when the new global best was found.
701
            x_fs (List[float]): List of floats representing function/fitness values found.
702
703
    See Also:
704
            * :class:`NiaPy.util.StoppingTask`
705
706
    """
707
708
    def __init__(self, **kwargs):
709
        r"""TODO.
710
711
        Args:
712
                **kwargs (Dict[str, Any]): Additional arguments.
713
714
        See Also:
715
                * :func:`NiaPy.util.StoppingTask.__init__`
716
717
        """
718
719
        StoppingTask.__init__(self, **kwargs)
720
        self.fig = plt.figure()
721
        self.ax = self.fig.subplots(nrows=1, ncols=1)
722
        self.ax.set_xlim(0, self.nFES)
723
        self.line, = self.ax.plot(self.iters, self.x_fs, animated=True)
724
        self.ani = anim.FuncAnimation(self.fig, self.updatePlot, blit=True)
725
        self.showPlot()
726
727
    def eval(self, A):
728
        r"""Evaluate solution.
729
730
        Args:
731
                A (numpy.ndarray): Solution to evaluate.
732
733
        Returns:
734
                float: Fitness/function values of solution.
735
736
        """
737
738
        x_f = StoppingTask.eval(self, A)
739
        if not self.x_f_vals:
740
            self.x_f_vals.append(x_f)
741
        elif x_f < self.x_f_vals[-1]:
742
            self.x_f_vals.append(x_f)
743
        else:
744
            self.x_f_vals.append(self.x_f_vals[-1])
745
        self.evals.append(self.Evals)
746
        return x_f
747
748
    def showPlot(self):
749
        r"""Animation updating function."""
750
        plt.show(block=False)
751
        plt.pause(0.001)
752
753
    def updatePlot(self, frame):
754
        r"""Update mathplotlib figure.
755
756
        Args:
757
                frame (): TODO
758
759
        Returns:
760
                Tuple[List[float], Any]:
761
                        1. Line
762
763
        """
764
765
        if self.x_f_vals:
766
            max_fs, min_fs = self.x_f_vals[0], self.x_f_vals[-1]
767
            self.ax.set_ylim(min_fs + 1, max_fs + 1)
768
            self.line.set_data(self.evals, self.x_f_vals)
769
        return self.line,
770
771
772
class TaskComposition(MoveTask):
773
    """Task compostion."""
774
775
    def __init__(self, benchmarks=None, rho=None, lamb=None, bias=None, **kwargs):
776
        r"""Initialize of composite function problem.
777
778
        Arguments:
779
                benchmarks (List[Benchmark]): Optimization function to use in composition
780
                delta (numpy.ndarray[float]): TODO
781
                lamb (numpy.ndarray[float]): TODO
782
                bias (numpy.ndarray[float]): TODO
783
784
        See Also:
785
                * :func:`NiaPy.util.MoveTask.__init__`
786
787
        TODO:
788
                Class is a work in progress.
789
790
        """
791
792
        MoveTask.__init__(self, **kwargs)
793
794
    def eval(self, A):
795
        r"""TODO.
796
797
        Args:
798
                A:
799
800
        Returns:
801
                float:
802
803
        Todo:
804
                Usage of multiple functions on the same time
805
806
        """
807
808
        return inf
809