Passed
Pull Request — master (#209)
by
unknown
02:03
created

NiaPy.task.task.Task.__init__()   C

Complexity

Conditions 10

Size

Total Lines 47
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 47
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# encoding=utf8
2
3
"""The implementation of tasks."""
4
5
import logging
6
from enum import Enum
7
8
from matplotlib import pyplot as plt, animation as anim
9
from numpy import inf, random as rand
10
11
from NiaPy.util import (
12
    limit_repair,
13
    fullArray,
14
    FesException,
15
    GenException,
16
    RefException
17
)
18
from NiaPy.benchmarks.utility import Utility
19
20
21
logging.basicConfig()
22
logger = logging.getLogger("NiaPy.task.Task")
23
logger.setLevel("INFO")
24
25
26
class OptimizationType(Enum):
27
    r"""Enum representing type of optimization.
28
29
    Attributes:
30
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
31
            MAXIMIZATION (int): Represents maximization problems.
32
33
    """
34
35
    MINIMIZATION = 1.0
36
    MAXIMIZATION = -1.0
37
38
39
class Task:
40
    r"""Class representing problem to solve with optimization.
41
42
    Date:
43
            2019
44
45
    Author:
46
            Klemen Berkovič and others
47
48
    Attributes:
49
            D (int): Dimension of the problem.
50
            Lower (numpy.ndarray): Lower bounds of the problem.
51
            Upper (numpy.ndarray): Upper bounds of the problem.
52
            bRange (numpy.ndarray): Search range between upper and lower limits.
53
            optType (OptimizationType): Optimization type to use.
54
55
    See Also:
56
            * :class:`NiaPy.util.Utility`
57
58
    """
59
60
    D = 0
61
    benchmark = None
62
    Lower, Upper, bRange = inf, inf, inf
63
    optType = OptimizationType.MINIMIZATION
64
65
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
66
        r"""Initialize task class for optimization.
67
68
        Arguments:
69
                D (Optional[int]): Number of dimensions.
70
                optType (Optional[OptimizationType]): Set the type of optimization.
71
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
72
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
73
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
74
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
75
76
        See Also:
77
                * `func`:NiaPy.util.Utility.__init__`
78
                * `func`:NiaPy.util.Utility.repair`
79
80
        """
81
82
        # dimension of the problem
83
        self.D = D
84
        # set optimization type
85
        self.optType = optType
86
        # set optimization function
87
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
88
89
        if self.benchmark is not None:
90
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
91
92
        # set Lower limits
93
        if Lower is not None:
94
            self.Lower = fullArray(Lower, self.D)
95
        elif Lower is None and benchmark is not None:
96
            self.Lower = fullArray(self.benchmark.Lower, self.D)
97
        else:
98
            self.Lower = fullArray(0, self.D)
99
100
        # set Upper limits
101
        if Upper is not None:
102
            self.Upper = fullArray(Upper, self.D)
103
        elif Upper is None and benchmark is not None:
104
            self.Upper = fullArray(self.benchmark.Upper, self.D)
105
        else:
106
            self.Upper = fullArray(0, self.D)
107
108
        # set range
109
        self.bRange = self.Upper - self.Lower
110
        # set repair function
111
        self.frepair = frepair
112
113
    def dim(self):
114
        r"""Get the number of dimensions.
115
116
        Returns:
117
                int: Dimension of problem optimizing.
118
119
        """
120
121
        return self.D
122
123
    def bcLower(self):
124
        r"""Get the array of lower bound constraint.
125
126
        Returns:
127
                numpy.ndarray: Lower bound.
128
129
        """
130
131
        return self.Lower
132
133
    def bcUpper(self):
134
        r"""Get the array of upper bound constraint.
135
136
        Returns:
137
                numpy.ndarray: Upper bound.
138
139
        """
140
141
        return self.Upper
142
143
    def bcRange(self):
144
        r"""Get the range of bound constraint.
145
146
        Returns:
147
                numpy.ndarray: Range between lower and upper bound.
148
149
        """
150
151
        return self.Upper - self.Lower
152
153
    def repair(self, x, rnd=rand):
154
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
155
156
        Arguments:
157
                x (numpy.ndarray): Solution to check and repair if needed.
158
                rnd (mtrand.RandomState): Random number generator.
159
160
        Returns:
161
                numpy.ndarray: Fixed solution.
162
163
        See Also:
164
                * :func:`NiaPy.util.limitRepair`
165
                * :func:`NiaPy.util.limitInversRepair`
166
                * :func:`NiaPy.util.wangRepair`
167
                * :func:`NiaPy.util.randRepair`
168
                * :func:`NiaPy.util.reflectRepair`
169
170
        """
171
172
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
173
174
    def nextIter(self):
175
        r"""Increments the number of algorithm iterations."""
176
177
    def start(self):
178
        r"""Start stopwatch."""
179
180
    def eval(self, A):
181
        r"""Evaluate the solution A.
182
183
        Arguments:
184
                A (numpy.ndarray): Solution to evaluate.
185
186
        Returns:
187
                float: Fitness/function values of solution.
188
189
        """
190
191
        return self.Fun(self.D, A) * self.optType.value
192
193
    def isFeasible(self, A):
194
        r"""Check if the solution is feasible.
195
196
        Arguments:
197
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
198
199
        Returns:
200
                bool: `True` if solution is in feasible space else `False`.
201
202
        """
203
204
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
205
206
    def stopCond(self):
207
        r"""Check if optimization task should stop.
208
209
        Returns:
210
                bool: `True` if stopping condition is meet else `False`.
211
212
        """
213
214
        return False
215
216
217
class CountingTask(Task):
218
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
219
220
    Attributes:
221
            Iters (int): Number of algorithm iterations/generations.
222
            Evals (int): Number of function evaluations.
223
224
    See Also:
225
            * :class:`NiaPy.util.Task`
226
227
    """
228
229
    def __init__(self, **kwargs):
230
        r"""Initialize counting task.
231
232
        Args:
233
                **kwargs (Dict[str, Any]): Additional arguments.
234
235
        See Also:
236
                * :func:`NiaPy.util.Task.__init__`
237
238
        """
239
240
        Task.__init__(self, **kwargs)
241
        self.Iters, self.Evals = 0, 0
242
243
    def eval(self, A):
244
        r"""Evaluate the solution A.
245
246
        This function increments function evaluation counter `self.Evals`.
247
248
        Arguments:
249
                A (numpy.ndarray): Solutions to evaluate.
250
251
        Returns:
252
                float: Fitness/function values of solution.
253
254
        See Also:
255
                * :func:`NiaPy.util.Task.eval`
256
257
        """
258
259
        r = Task.eval(self, A)
260
        self.Evals += 1
261
        return r
262
263
    def evals(self):
264
        r"""Get the number of evaluations made.
265
266
        Returns:
267
                int: Number of evaluations made.
268
269
        """
270
271
        return self.Evals
272
273
    def iters(self):
274
        r"""Get the number of algorithm iteratins made.
275
276
        Returns:
277
                int: Number of generations/iterations made by algorithm.
278
279
        """
280
281
        return self.Iters
282
283
    def nextIter(self):
284
        r"""Increases the number of algorithm iterations made.
285
286
        This function increments number of algorithm iterations/generations counter `self.Iters`.
287
288
        """
289
290
        self.Iters += 1
291
292
293
class StoppingTask(CountingTask):
294
    r"""Optimization task with implemented checking for stopping criterias.
295
296
    Attributes:
297
            nGEN (int): Maximum number of algorithm iterations/generations.
298
            nFES (int): Maximum number of function evaluations.
299
            refValue (float): Reference function/fitness values to reach in optimization.
300
            x (numpy.ndarray): Best found individual.
301
            x_f (float): Best found individual function/fitness value.
302
303
    See Also:
304
            * :class:`NiaPy.util.CountingTask`
305
306
    """
307
308
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, logger=False, **kwargs):
309
        r"""Initialize task class for optimization.
310
311
        Arguments:
312
                nFES (Optional[int]): Number of function evaluations.
313
                nGEN (Optional[int]): Number of generations or iterations.
314
                refValue (Optional[float]): Reference value of function/fitness function.
315
316
        Note:
317
                Storing improvements during the evolutionary cycle is
318
                captured in self.n_evals and self.x_f_vals
319
320
        See Also:
321
                * :func:`NiaPy.util.CountingTask.__init__`
322
323
        """
324
325
        CountingTask.__init__(self, **kwargs)
326
        self.refValue = (-inf if refValue is None else refValue)
327
        self.logger = logger
328
        self.x, self.x_f = None, inf
329
        self.nFES, self.nGEN = nFES, nGEN
330
        self.n_evals = []
331
        self.x_f_vals = []
332
333
    def eval(self, A):
334
        r"""Evaluate solution.
335
336
        Args:
337
                A (numpy.ndarray): Solution to evaluate.
338
339
        Returns:
340
                float: Fitness/function value of solution.
341
342
        See Also:
343
                * :func:`NiaPy.util.StoppingTask.stopCond`
344
                * :func:`NiaPy.util.CountingTask.eval`
345
346
        """
347
348
        if self.stopCond():
349
            return inf * self.optType.value
350
351
        x_f = CountingTask.eval(self, A)
352
353
        if x_f < self.x_f:
354
            self.x_f = x_f
355
            self.n_evals.append(self.Evals)
356
            self.x_f_vals.append(x_f)
357
            if self.logger:
358
                logger.info('nFES:%d => %s' % (self.Evals, self.x_f))
359
360
        return x_f
361
362
    def stopCond(self):
363
        r"""Check if stopping condition reached.
364
365
        Returns:
366
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`
367
368
        """
369
370
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
371
372
    def stopCondI(self):
373
        r"""Check if stopping condition reached and increase number of iterations.
374
375
        Returns:
376
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
377
378
        See Also:
379
                * :func:`NiaPy.util.StoppingTask.stopCond`
380
                * :func:`NiaPy.util.CountingTask.nextIter`
381
382
        """
383
384
        r = self.stopCond()
385
        CountingTask.nextIter(self)
386
        return r
387
388
    def return_conv(self):
389
        r"""Get values of x and y axis for plotting covariance graph.
390
391
        Returns:
392
                Tuple[List[int], List[float]]:
393
                    1. List of ints of function evaluations.
394
                    2. List of ints of function/fitness values.
395
396
        """
397
        return self.evals, self.x_f_vals
398
399
400
class ThrowingTask(StoppingTask):
401
    r"""Task that throw exceptions when stopping condition is meet.
402
403
    See Also:
404
            * :class:`NiaPy.util.StoppingTask`
405
406
    """
407
408
    def __init__(self, **kwargs):
409
        r"""Initialize optimization task.
410
411
        Args:
412
                **kwargs (Dict[str, Any]): Additional arguments.
413
414
        See Also:
415
                * :func:`NiaPy.util.StoppingTask.__init__`
416
417
        """
418
419
        StoppingTask.__init__(self, **kwargs)
420
421
    def stopCondE(self):
422
        r"""Throw exception for the given stopping condition.
423
424
        Raises:
425
                * FesException: Thrown when the number of function/fitness evaluations is reached.
426
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
427
                * RefException: Thrown when the reference values is reached.
428
                * TimeException: Thrown when algorithm exceeds time run limit.
429
430
        """
431
432
        # dtime = datetime.now() - self.startTime
433
        if self.Evals >= self.nFES:
434
            raise FesException()
435
        if self.Iters >= self.nGEN:
436
            raise GenException()
437
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
438
        if self.refValue >= self.x_f:
439
            raise RefException()
440
441
    def eval(self, A):
442
        r"""Evaluate solution.
443
444
        Args:
445
                A (numpy.ndarray): Solution to evaluate.
446
447
        Returns:
448
                float: Function/fitness values of solution.
449
450
        See Also:
451
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
452
                * :func:`NiaPy.util.StoppingTask.eval`
453
454
        """
455
456
        self.stopCondE()
457
        return StoppingTask.eval(self, A)
458
459
460
class TaskConvPlot():
461
    r"""Task class with ability of showing convergence graph.
462
463
    Attributes:
464
            iters (List[int]): List of ints representing when the new global best was found.
465
            x_fs (List[float]): List of floats representing function/fitness values found.
466
467
    See Also:
468
            * :class:`NiaPy.util.StoppingTask`
469
470
    """
471
472
    def __init__(self, **kwargs):
473
        r"""TODO.
474
475
        Args:
476
                **kwargs (Dict[str, Any]): Additional arguments.
477
478
        See Also:
479
                * :func:`NiaPy.util.StoppingTask.__init__`
480
481
        """
482
483
        StoppingTask.__init__(self, **kwargs)
484
        self.fig = plt.figure()
485
        self.ax = self.fig.subplots(nrows=1, ncols=1)
486
        self.ax.set_xlim(0, self.nFES)
487
        self.line, = self.ax.plot(self.iters, self.x_fs, animated=True)
488
        self.ani = anim.FuncAnimation(self.fig, self.updatePlot, blit=True)
489
        self.showPlot()
490
491
    def eval(self, A):
492
        r"""Evaluate solution.
493
494
        Args:
495
                A (numpy.ndarray): Solution to evaluate.
496
497
        Returns:
498
                float: Fitness/function values of solution.
499
500
        """
501
502
        x_f = StoppingTask.eval(self, A)
503
        if not self.x_f_vals:
504
            self.x_f_vals.append(x_f)
505
        elif x_f < self.x_f_vals[-1]:
506
            self.x_f_vals.append(x_f)
507
        else:
508
            self.x_f_vals.append(self.x_f_vals[-1])
509
        self.evals.append(self.Evals)
510
        return x_f
511
512
    def showPlot(self):
513
        r"""Animation updating function."""
514
        plt.show(block=False)
515
        plt.pause(0.001)
516
517
    def updatePlot(self, frame):
518
        r"""Update mathplotlib figure.
519
520
        Args:
521
                frame (): TODO
522
523
        Returns:
524
                Tuple[List[float], Any]:
525
                        1. Line
526
527
        """
528
529
        if self.x_f_vals:
530
            max_fs, min_fs = self.x_f_vals[0], self.x_f_vals[-1]
531
            self.ax.set_ylim(min_fs + 1, max_fs + 1)
532
            self.line.set_data(self.evals, self.x_f_vals)
533
        return self.line,
534