Passed
Push — master ( 907090...e51968 )
by Grega
01:21
created

NiaPy.task.task.Task.__init__()   C

Complexity

Conditions 10

Size

Total Lines 47
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 47
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# encoding=utf8
2
3
"""The implementation of tasks."""
4
5
import logging
6
from enum import Enum
7
8
from matplotlib import pyplot as plt, animation as anim
9
from numpy import inf, random as rand, asarray
10
from numpy.core.multiarray import ndarray, dot
11
from numpy.core.umath import fabs
12
13
from NiaPy.util import (
14
    limit_repair,
15
    fullArray,
16
    FesException,
17
    GenException,
18
    RefException
19
)
20
from NiaPy.benchmarks.utility import Utility
21
22
23
logging.basicConfig()
24
logger = logging.getLogger("NiaPy.task.Task")
25
logger.setLevel("INFO")
26
27
28
class OptimizationType(Enum):
29
    r"""Enum representing type of optimization.
30
31
    Attributes:
32
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
33
            MAXIMIZATION (int): Represents maximization problems.
34
35
    """
36
37
    MINIMIZATION = 1.0
38
    MAXIMIZATION = -1.0
39
40
41
class Task:
42
    r"""Class representing problem to solve with optimization.
43
44
    Date:
45
            2019
46
47
    Author:
48
            Klemen Berkovič and others
49
50
    Attributes:
51
            D (int): Dimension of the problem.
52
            Lower (numpy.ndarray): Lower bounds of the problem.
53
            Upper (numpy.ndarray): Upper bounds of the problem.
54
            bRange (numpy.ndarray): Search range between upper and lower limits.
55
            optType (OptimizationType): Optimization type to use.
56
57
    See Also:
58
            * :class:`NiaPy.util.Utility`
59
60
    """
61
62
    D = 0
63
    benchmark = None
64
    Lower, Upper, bRange = inf, inf, inf
65
    optType = OptimizationType.MINIMIZATION
66
67
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
68
        r"""Initialize task class for optimization.
69
70
        Arguments:
71
                D (Optional[int]): Number of dimensions.
72
                optType (Optional[OptimizationType]): Set the type of optimization.
73
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
74
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
75
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
76
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
77
78
        See Also:
79
                * `func`:NiaPy.util.Utility.__init__`
80
                * `func`:NiaPy.util.Utility.repair`
81
82
        """
83
84
        # dimension of the problem
85
        self.D = D
86
        # set optimization type
87
        self.optType = optType
88
        # set optimization function
89
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
90
91
        if self.benchmark is not None:
92
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
93
94
        # set Lower limits
95
        if Lower is not None:
96
            self.Lower = fullArray(Lower, self.D)
97
        elif Lower is None and benchmark is not None:
98
            self.Lower = fullArray(self.benchmark.Lower, self.D)
99
        else:
100
            self.Lower = fullArray(0, self.D)
101
102
        # set Upper limits
103
        if Upper is not None:
104
            self.Upper = fullArray(Upper, self.D)
105
        elif Upper is None and benchmark is not None:
106
            self.Upper = fullArray(self.benchmark.Upper, self.D)
107
        else:
108
            self.Upper = fullArray(0, self.D)
109
110
        # set range
111
        self.bRange = self.Upper - self.Lower
112
        # set repair function
113
        self.frepair = frepair
114
115
    def dim(self):
116
        r"""Get the number of dimensions.
117
118
        Returns:
119
                int: Dimension of problem optimizing.
120
121
        """
122
123
        return self.D
124
125
    def bcLower(self):
126
        r"""Get the array of lower bound constraint.
127
128
        Returns:
129
                numpy.ndarray: Lower bound.
130
131
        """
132
133
        return self.Lower
134
135
    def bcUpper(self):
136
        r"""Get the array of upper bound constraint.
137
138
        Returns:
139
                numpy.ndarray: Upper bound.
140
141
        """
142
143
        return self.Upper
144
145
    def bcRange(self):
146
        r"""Get the range of bound constraint.
147
148
        Returns:
149
                numpy.ndarray: Range between lower and upper bound.
150
151
        """
152
153
        return self.Upper - self.Lower
154
155
    def repair(self, x, rnd=rand):
156
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
157
158
        Arguments:
159
                x (numpy.ndarray): Solution to check and repair if needed.
160
                rnd (mtrand.RandomState): Random number generator.
161
162
        Returns:
163
                numpy.ndarray: Fixed solution.
164
165
        See Also:
166
                * :func:`NiaPy.util.limitRepair`
167
                * :func:`NiaPy.util.limitInversRepair`
168
                * :func:`NiaPy.util.wangRepair`
169
                * :func:`NiaPy.util.randRepair`
170
                * :func:`NiaPy.util.reflectRepair`
171
172
        """
173
174
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
175
176
    def nextIter(self):
177
        r"""Increments the number of algorithm iterations."""
178
179
    def start(self):
180
        r"""Start stopwatch."""
181
182
    def eval(self, A):
183
        r"""Evaluate the solution A.
184
185
        Arguments:
186
                A (numpy.ndarray): Solution to evaluate.
187
188
        Returns:
189
                float: Fitness/function values of solution.
190
191
        """
192
193
        return self.Fun(self.D, A) * self.optType.value
194
195
    def isFeasible(self, A):
196
        r"""Check if the solution is feasible.
197
198
        Arguments:
199
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
200
201
        Returns:
202
                bool: `True` if solution is in feasible space else `False`.
203
204
        """
205
206
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
207
208
    def stopCond(self):
209
        r"""Check if optimization task should stop.
210
211
        Returns:
212
                bool: `True` if stopping condition is meet else `False`.
213
214
        """
215
216
        return False
217
218
219
class CountingTask(Task):
220
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
221
222
    Attributes:
223
            Iters (int): Number of algorithm iterations/generations.
224
            Evals (int): Number of function evaluations.
225
226
    See Also:
227
            * :class:`NiaPy.util.Task`
228
229
    """
230
231
    def __init__(self, **kwargs):
232
        r"""Initialize counting task.
233
234
        Args:
235
                **kwargs (Dict[str, Any]): Additional arguments.
236
237
        See Also:
238
                * :func:`NiaPy.util.Task.__init__`
239
240
        """
241
242
        Task.__init__(self, **kwargs)
243
        self.Iters, self.Evals = 0, 0
244
245
    def eval(self, A):
246
        r"""Evaluate the solution A.
247
248
        This function increments function evaluation counter `self.Evals`.
249
250
        Arguments:
251
                A (numpy.ndarray): Solutions to evaluate.
252
253
        Returns:
254
                float: Fitness/function values of solution.
255
256
        See Also:
257
                * :func:`NiaPy.util.Task.eval`
258
259
        """
260
261
        r = Task.eval(self, A)
262
        self.Evals += 1
263
        return r
264
265
    def evals(self):
266
        r"""Get the number of evaluations made.
267
268
        Returns:
269
                int: Number of evaluations made.
270
271
        """
272
273
        return self.Evals
274
275
    def iters(self):
276
        r"""Get the number of algorithm iteratins made.
277
278
        Returns:
279
                int: Number of generations/iterations made by algorithm.
280
281
        """
282
283
        return self.Iters
284
285
    def nextIter(self):
286
        r"""Increases the number of algorithm iterations made.
287
288
        This function increments number of algorithm iterations/generations counter `self.Iters`.
289
290
        """
291
292
        self.Iters += 1
293
294
295
class StoppingTask(CountingTask):
296
    r"""Optimization task with implemented checking for stopping criterias.
297
298
    Attributes:
299
            nGEN (int): Maximum number of algorithm iterations/generations.
300
            nFES (int): Maximum number of function evaluations.
301
            refValue (float): Reference function/fitness values to reach in optimization.
302
            x (numpy.ndarray): Best found individual.
303
            x_f (float): Best found individual function/fitness value.
304
305
    See Also:
306
            * :class:`NiaPy.util.CountingTask`
307
308
    """
309
310
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, logger=False, **kwargs):
311
        r"""Initialize task class for optimization.
312
313
        Arguments:
314
                nFES (Optional[int]): Number of function evaluations.
315
                nGEN (Optional[int]): Number of generations or iterations.
316
                refValue (Optional[float]): Reference value of function/fitness function.
317
318
        Note:
319
                Storing improvements during the evolutionary cycle is
320
                captured in self.n_evals and self.x_f_vals
321
322
        See Also:
323
                * :func:`NiaPy.util.CountingTask.__init__`
324
325
        """
326
327
        CountingTask.__init__(self, **kwargs)
328
        self.refValue = (-inf if refValue is None else refValue)
329
        self.logger = logger
330
        self.x, self.x_f = None, inf
331
        self.nFES, self.nGEN = nFES, nGEN
332
        self.n_evals = []
333
        self.x_f_vals = []
334
335
    def eval(self, A):
336
        r"""Evaluate solution.
337
338
        Args:
339
                A (numpy.ndarray): Solution to evaluate.
340
341
        Returns:
342
                float: Fitness/function value of solution.
343
344
        See Also:
345
                * :func:`NiaPy.util.StoppingTask.stopCond`
346
                * :func:`NiaPy.util.CountingTask.eval`
347
348
        """
349
350
        if self.stopCond():
351
            return inf * self.optType.value
352
353
        x_f = CountingTask.eval(self, A)
354
355
        if x_f < self.x_f:
356
            self.x_f = x_f
357
            self.n_evals.append(self.Evals)
358
            self.x_f_vals.append(x_f)
359
            if self.logger:
360
                logger.info('nFES:%d => %s' % (self.Evals, self.x_f))
361
362
        return x_f
363
364
    def stopCond(self):
365
        r"""Check if stopping condition reached.
366
367
        Returns:
368
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`
369
370
        """
371
372
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
373
374
    def stopCondI(self):
375
        r"""Check if stopping condition reached and increase number of iterations.
376
377
        Returns:
378
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
379
380
        See Also:
381
                * :func:`NiaPy.util.StoppingTask.stopCond`
382
                * :func:`NiaPy.util.CountingTask.nextIter`
383
384
        """
385
386
        r = self.stopCond()
387
        CountingTask.nextIter(self)
388
        return r
389
390
    def return_conv(self):
391
        r"""Get values of x and y axis for plotting covariance graph.
392
393
        Returns:
394
                Tuple[List[int], List[float]]:
395
                    1. List of ints of function evaluations.
396
                    2. List of ints of function/fitness values.
397
398
        """
399
        return self.evals, self.x_f_vals
400
401
402
class ThrowingTask(StoppingTask):
403
    r"""Task that throw exceptions when stopping condition is meet.
404
405
    See Also:
406
            * :class:`NiaPy.util.StoppingTask`
407
408
    """
409
410
    def __init__(self, **kwargs):
411
        r"""Initialize optimization task.
412
413
        Args:
414
                **kwargs (Dict[str, Any]): Additional arguments.
415
416
        See Also:
417
                * :func:`NiaPy.util.StoppingTask.__init__`
418
419
        """
420
421
        StoppingTask.__init__(self, **kwargs)
422
423
    def stopCondE(self):
424
        r"""Throw exception for the given stopping condition.
425
426
        Raises:
427
                * FesException: Thrown when the number of function/fitness evaluations is reached.
428
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
429
                * RefException: Thrown when the reference values is reached.
430
                * TimeException: Thrown when algorithm exceeds time run limit.
431
432
        """
433
434
        # dtime = datetime.now() - self.startTime
435
        if self.Evals >= self.nFES:
436
            raise FesException()
437
        if self.Iters >= self.nGEN:
438
            raise GenException()
439
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
440
        if self.refValue >= self.x_f:
441
            raise RefException()
442
443
    def eval(self, A):
444
        r"""Evaluate solution.
445
446
        Args:
447
                A (numpy.ndarray): Solution to evaluate.
448
449
        Returns:
450
                float: Function/fitness values of solution.
451
452
        See Also:
453
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
454
                * :func:`NiaPy.util.StoppingTask.eval`
455
456
        """
457
458
        self.stopCondE()
459
        return StoppingTask.eval(self, A)
460
461
462
class MoveTask(StoppingTask):
463
    """Move task implementation."""
464
465
    def __init__(self, o=None, fo=None, M=None, fM=None, optF=None, **kwargs):
466
        r"""Initialize task class for optimization.
467
468
        Arguments:
469
                o (numpy.ndarray[Union[float, int]]): Array for shifting.
470
                of (Callable[numpy.ndarray[Union[float, int]]]): Function applied on shifted input.
471
                M (numpy.ndarray[Union[float, int]]): Matrix for rotating.
472
                fM (Callable[numpy.ndarray[Union[float, int]]]): Function applied after rotating.
473
474
        See Also:
475
                * :func:`NiaPy.util.StoppingTask.__init__`
476
477
        """
478
479
        StoppingTask.__init__(self, **kwargs)
480
        self.o = o if isinstance(o, ndarray) or o is None else asarray(o)
481
        self.M = M if isinstance(M, ndarray) or M is None else asarray(M)
482
        self.fo, self.fM, self.optF = fo, fM, optF
483
484
    def eval(self, A):
485
        r"""Evaluate the solution.
486
487
        Args:
488
                A (numpy.ndarray): Solution to evaluate
489
490
        Returns:
491
                float: Fitness/function value of solution.
492
493
        See Also:
494
                * :func:`NiaPy.util.StoppingTask.stopCond`
495
                * :func:`NiaPy.util.StoppingTask.eval`
496
497
        """
498
499
        if self.stopCond():
500
            return inf * self.optType.value
501
        X = A - self.o if self.o is not None else A
502
        X = self.fo(X) if self.fo is not None else X
503
        X = dot(X, self.M) if self.M is not None else X
504
        X = self.fM(X) if self.fM is not None else X
505
        r = StoppingTask.eval(self, X) + (self.optF if self.optF is not None else 0)
506
        if r <= self.x_f:
507
            self.x, self.x_f = A, r
508
        return r
509
510
511
class ScaledTask(Task):
512
    r"""Scaled task.
513
514
    Attributes:
515
            _task (Task): Optimization task with evaluation function.
516
            Lower (numpy.ndarray): Scaled lower limit of search space.
517
            Upper (numpy.ndarray): Scaled upper limit of search space.
518
519
    See Also:
520
            * :class:`NiaPy.util.Task`
521
522
    """
523
524
    def __init__(self, task, Lower, Upper, **kwargs):
525
        r"""Initialize scaled task.
526
527
        Args:
528
                task (Task): Optimization task to scale to new bounds.
529
                Lower (Union[float, int, numpy.ndarray]): New lower bounds.
530
                Upper (Union[float, int, numpy.ndarray]): New upper bounds.
531
                **kwargs (Dict[str, Any]): Additional arguments.
532
533
        See Also:
534
                * :func:`NiaPy.util.fullArray`
535
536
        """
537
538
        Task.__init__(self)
539
        self._task = task
540
        self.D = self._task.D
541
        self.Lower, self.Upper = fullArray(Lower, self.D), fullArray(Upper, self.D)
542
        self.bRange = fabs(Upper - Lower)
543
544
    def stopCond(self):
545
        r"""Test for stopping condition.
546
547
        This function uses `self._task` for checking the stopping criteria.
548
549
        Returns:
550
                bool: `True` if stopping condition is meet else `False`.
551
552
        """
553
554
        return self._task.stopCond()
555
556
    def stopCondI(self):
557
        r"""Test for stopping condition and increments the number of algorithm generations/iterations.
558
559
        This function uses `self._task` for checking the stopping criteria.
560
561
        Returns:
562
                bool: `True` if stopping condition is meet else `False`.
563
564
        """
565
566
        return self._task.stopCondI()
567
568
    def eval(self, A):
569
        r"""Evaluate solution.
570
571
        Args:
572
                A (numpy.ndarray): Solution for calculating function/fitness value.
573
574
        Returns:
575
                float: Function values of solution.
576
577
        """
578
579
        return self._task.eval(A)
580
581
    def evals(self):
582
        r"""Get the number of function evaluations.
583
584
        Returns:
585
                int: Number of function evaluations.
586
587
        """
588
589
        return self._task.evals()
590
591
    def iters(self):
592
        r"""Get the number of algorithms generations/iterations.
593
594
        Returns:
595
                int: Number of algorithms generations/iterations.
596
597
        """
598
599
        return self._task.iters()
600
601
    def nextIter(self):
602
        r"""Increment the number of iterations/generations.
603
604
        Function uses `self._task` to increment number of generations/iterations.
605
606
        """
607
608
        self._task.nextIter()
609
610
611
class TaskConvPlot():
612
    r"""Task class with ability of showing convergence graph.
613
614
    Attributes:
615
            iters (List[int]): List of ints representing when the new global best was found.
616
            x_fs (List[float]): List of floats representing function/fitness values found.
617
618
    See Also:
619
            * :class:`NiaPy.util.StoppingTask`
620
621
    """
622
623
    def __init__(self, **kwargs):
624
        r"""TODO.
625
626
        Args:
627
                **kwargs (Dict[str, Any]): Additional arguments.
628
629
        See Also:
630
                * :func:`NiaPy.util.StoppingTask.__init__`
631
632
        """
633
634
        StoppingTask.__init__(self, **kwargs)
635
        self.fig = plt.figure()
636
        self.ax = self.fig.subplots(nrows=1, ncols=1)
637
        self.ax.set_xlim(0, self.nFES)
638
        self.line, = self.ax.plot(self.iters, self.x_fs, animated=True)
639
        self.ani = anim.FuncAnimation(self.fig, self.updatePlot, blit=True)
640
        self.showPlot()
641
642
    def eval(self, A):
643
        r"""Evaluate solution.
644
645
        Args:
646
                A (numpy.ndarray): Solution to evaluate.
647
648
        Returns:
649
                float: Fitness/function values of solution.
650
651
        """
652
653
        x_f = StoppingTask.eval(self, A)
654
        if not self.x_f_vals:
655
            self.x_f_vals.append(x_f)
656
        elif x_f < self.x_f_vals[-1]:
657
            self.x_f_vals.append(x_f)
658
        else:
659
            self.x_f_vals.append(self.x_f_vals[-1])
660
        self.evals.append(self.Evals)
661
        return x_f
662
663
    def showPlot(self):
664
        r"""Animation updating function."""
665
        plt.show(block=False)
666
        plt.pause(0.001)
667
668
    def updatePlot(self, frame):
669
        r"""Update mathplotlib figure.
670
671
        Args:
672
                frame (): TODO
673
674
        Returns:
675
                Tuple[List[float], Any]:
676
                        1. Line
677
678
        """
679
680
        if self.x_f_vals:
681
            max_fs, min_fs = self.x_f_vals[0], self.x_f_vals[-1]
682
            self.ax.set_ylim(min_fs + 1, max_fs + 1)
683
            self.line.set_data(self.evals, self.x_f_vals)
684
        return self.line,
685
686
687
class TaskComposition(MoveTask):
688
    """Task compostion."""
689
690
    def __init__(self, benchmarks=None, rho=None, lamb=None, bias=None, **kwargs):
691
        r"""Initialize of composite function problem.
692
693
        Arguments:
694
                benchmarks (List[Benchmark]): Optimization function to use in composition
695
                delta (numpy.ndarray[float]): TODO
696
                lamb (numpy.ndarray[float]): TODO
697
                bias (numpy.ndarray[float]): TODO
698
699
        See Also:
700
                * :func:`NiaPy.util.MoveTask.__init__`
701
702
        TODO:
703
                Class is a work in progress.
704
705
        """
706
707
        MoveTask.__init__(self, **kwargs)
708
709
    def eval(self, A):
710
        r"""TODO.
711
712
        Args:
713
                A:
714
715
        Returns:
716
                float:
717
718
        Todo:
719
                Usage of multiple functions on the same time
720
721
        """
722
723
        return inf
724