NiaPy.task.task.Task.__init__()   C
last analyzed

Complexity

Conditions 10

Size

Total Lines 47
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 18
nop 8
dl 0
loc 47
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like NiaPy.task.task.Task.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# encoding=utf8
2
3
"""The implementation of tasks."""
4
5
import logging
6
from enum import Enum
7
8
from matplotlib import pyplot as plt
9
from numpy import inf, random as rand
10
11
from NiaPy.util.utility import (
12
    limit_repair,
13
    fullArray
14
)
15
from NiaPy.util.exception import (
16
    FesException,
17
    GenException,
18
    RefException
19
)
20
from NiaPy.task.utility import Utility
21
22
23
logging.basicConfig()
24
logger = logging.getLogger("NiaPy.task.Task")
25
logger.setLevel("INFO")
26
27
28
class OptimizationType(Enum):
29
    r"""Enum representing type of optimization.
30
31
    Attributes:
32
            MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms.
33
            MAXIMIZATION (int): Represents maximization problems.
34
35
    """
36
37
    MINIMIZATION = 1.0
38
    MAXIMIZATION = -1.0
39
40
41
class Task:
42
    r"""Class representing problem to solve with optimization.
43
44
    Date:
45
            2019
46
47
    Author:
48
            Klemen Berkovič and others
49
50
    Attributes:
51
            D (int): Dimension of the problem.
52
            Lower (numpy.ndarray): Lower bounds of the problem.
53
            Upper (numpy.ndarray): Upper bounds of the problem.
54
            bRange (numpy.ndarray): Search range between upper and lower limits.
55
            optType (OptimizationType): Optimization type to use.
56
57
    See Also:
58
            * :class:`NiaPy.util.Utility`
59
60
    """
61
62
    D = 0
63
    benchmark = None
64
    Lower, Upper, bRange = inf, inf, inf
65
    optType = OptimizationType.MINIMIZATION
66
67
    def __init__(self, D=0, optType=OptimizationType.MINIMIZATION, benchmark=None, Lower=None, Upper=None, frepair=limit_repair, **kwargs):
68
        r"""Initialize task class for optimization.
69
70
        Arguments:
71
                D (Optional[int]): Number of dimensions.
72
                optType (Optional[OptimizationType]): Set the type of optimization.
73
                benchmark (Union[str, Benchmark]): Problem to solve with optimization.
74
                Lower (Optional[numpy.ndarray]): Lower limits of the problem.
75
                Upper (Optional[numpy.ndarray]): Upper limits of the problem.
76
                frepair (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for reparing individuals components to desired limits.
77
78
        See Also:
79
                * `func`:NiaPy.util.Utility.__init__`
80
                * `func`:NiaPy.util.Utility.repair`
81
82
        """
83
84
        # dimension of the problem
85
        self.D = D
86
        # set optimization type
87
        self.optType = optType
88
        # set optimization function
89
        self.benchmark = Utility().get_benchmark(benchmark) if benchmark is not None else None
90
91
        if self.benchmark is not None:
92
            self.Fun = self.benchmark.function() if self.benchmark is not None else None
93
94
        # set Lower limits
95
        if Lower is not None:
96
            self.Lower = fullArray(Lower, self.D)
97
        elif Lower is None and benchmark is not None:
98
            self.Lower = fullArray(self.benchmark.Lower, self.D)
99
        else:
100
            self.Lower = fullArray(0, self.D)
101
102
        # set Upper limits
103
        if Upper is not None:
104
            self.Upper = fullArray(Upper, self.D)
105
        elif Upper is None and benchmark is not None:
106
            self.Upper = fullArray(self.benchmark.Upper, self.D)
107
        else:
108
            self.Upper = fullArray(0, self.D)
109
110
        # set range
111
        self.bRange = self.Upper - self.Lower
112
        # set repair function
113
        self.frepair = frepair
114
115
    def dim(self):
116
        r"""Get the number of dimensions.
117
118
        Returns:
119
                int: Dimension of problem optimizing.
120
121
        """
122
123
        return self.D
124
125
    def bcLower(self):
126
        r"""Get the array of lower bound constraint.
127
128
        Returns:
129
                numpy.ndarray: Lower bound.
130
131
        """
132
133
        return self.Lower
134
135
    def bcUpper(self):
136
        r"""Get the array of upper bound constraint.
137
138
        Returns:
139
                numpy.ndarray: Upper bound.
140
141
        """
142
143
        return self.Upper
144
145
    def bcRange(self):
146
        r"""Get the range of bound constraint.
147
148
        Returns:
149
                numpy.ndarray: Range between lower and upper bound.
150
151
        """
152
153
        return self.Upper - self.Lower
154
155
    def repair(self, x, rnd=rand):
156
        r"""Repair solution and put the solution in the random position inside of the bounds of problem.
157
158
        Arguments:
159
                x (numpy.ndarray): Solution to check and repair if needed.
160
                rnd (mtrand.RandomState): Random number generator.
161
162
        Returns:
163
                numpy.ndarray: Fixed solution.
164
165
        See Also:
166
                * :func:`NiaPy.util.limitRepair`
167
                * :func:`NiaPy.util.limitInversRepair`
168
                * :func:`NiaPy.util.wangRepair`
169
                * :func:`NiaPy.util.randRepair`
170
                * :func:`NiaPy.util.reflectRepair`
171
172
        """
173
174
        return self.frepair(x, self.Lower, self.Upper, rnd=rnd)
175
176
    def nextIter(self):
177
        r"""Increments the number of algorithm iterations."""
178
179
    def start(self):
180
        r"""Start stopwatch."""
181
182
    def eval(self, A):
183
        r"""Evaluate the solution A.
184
185
        Arguments:
186
                A (numpy.ndarray): Solution to evaluate.
187
188
        Returns:
189
                float: Fitness/function values of solution.
190
191
        """
192
193
        return self.Fun(self.D, A) * self.optType.value
194
195
    def isFeasible(self, A):
196
        r"""Check if the solution is feasible.
197
198
        Arguments:
199
                A (Union[numpy.ndarray, Individual]): Solution to check for feasibility.
200
201
        Returns:
202
                bool: `True` if solution is in feasible space else `False`.
203
204
        """
205
206
        return False not in (A >= self.Lower) and False not in (A <= self.Upper)
207
208
    def stopCond(self):
209
        r"""Check if optimization task should stop.
210
211
        Returns:
212
                bool: `True` if stopping condition is meet else `False`.
213
214
        """
215
216
        return False
217
218
219
class CountingTask(Task):
220
    r"""Optimization task with added counting of function evaluations and algorithm iterations/generations.
221
222
    Attributes:
223
            Iters (int): Number of algorithm iterations/generations.
224
            Evals (int): Number of function evaluations.
225
226
    See Also:
227
            * :class:`NiaPy.util.Task`
228
229
    """
230
231
    def __init__(self, **kwargs):
232
        r"""Initialize counting task.
233
234
        Args:
235
                **kwargs (Dict[str, Any]): Additional arguments.
236
237
        See Also:
238
                * :func:`NiaPy.util.Task.__init__`
239
240
        """
241
242
        Task.__init__(self, **kwargs)
243
        self.Iters, self.Evals = 0, 0
244
245
    def eval(self, A):
246
        r"""Evaluate the solution A.
247
248
        This function increments function evaluation counter `self.Evals`.
249
250
        Arguments:
251
                A (numpy.ndarray): Solutions to evaluate.
252
253
        Returns:
254
                float: Fitness/function values of solution.
255
256
        See Also:
257
                * :func:`NiaPy.util.Task.eval`
258
259
        """
260
261
        r = Task.eval(self, A)
262
        self.Evals += 1
263
        return r
264
265
    def evals(self):
266
        r"""Get the number of evaluations made.
267
268
        Returns:
269
                int: Number of evaluations made.
270
271
        """
272
273
        return self.Evals
274
275
    def iters(self):
276
        r"""Get the number of algorithm iteratins made.
277
278
        Returns:
279
                int: Number of generations/iterations made by algorithm.
280
281
        """
282
283
        return self.Iters
284
285
    def nextIter(self):
286
        r"""Increases the number of algorithm iterations made.
287
288
        This function increments number of algorithm iterations/generations counter `self.Iters`.
289
290
        """
291
292
        self.Iters += 1
293
294
295
class StoppingTask(CountingTask):
296
    r"""Optimization task with implemented checking for stopping criterias.
297
298
    Attributes:
299
            nGEN (int): Maximum number of algorithm iterations/generations.
300
            nFES (int): Maximum number of function evaluations.
301
            refValue (float): Reference function/fitness values to reach in optimization.
302
            x (numpy.ndarray): Best found individual.
303
            x_f (float): Best found individual function/fitness value.
304
305
    See Also:
306
            * :class:`NiaPy.util.CountingTask`
307
308
    """
309
310
    def __init__(self, nFES=inf, nGEN=inf, refValue=None, logger=False, **kwargs):
311
        r"""Initialize task class for optimization.
312
313
        Arguments:
314
                nFES (Optional[int]): Number of function evaluations.
315
                nGEN (Optional[int]): Number of generations or iterations.
316
                refValue (Optional[float]): Reference value of function/fitness function.
317
                logger (Optional[bool]): Enable/disable logging of improvements.
318
319
        Note:
320
                Storing improvements during the evolutionary cycle is
321
                captured in self.n_evals and self.x_f_vals
322
323
        See Also:
324
                * :func:`NiaPy.util.CountingTask.__init__`
325
326
        """
327
328
        CountingTask.__init__(self, **kwargs)
329
        self.refValue = (-inf if refValue is None else refValue)
330
        self.logger = logger
331
        self.x, self.x_f = None, inf
332
        self.nFES, self.nGEN = nFES, nGEN
333
        self.n_evals = []
334
        self.x_f_vals = []
335
336
    def eval(self, A):
337
        r"""Evaluate solution.
338
339
        Args:
340
                A (numpy.ndarray): Solution to evaluate.
341
342
        Returns:
343
                float: Fitness/function value of solution.
344
345
        See Also:
346
                * :func:`NiaPy.util.StoppingTask.stopCond`
347
                * :func:`NiaPy.util.CountingTask.eval`
348
349
        """
350
351
        if self.stopCond():
352
            return inf * self.optType.value
353
354
        x_f = CountingTask.eval(self, A)
355
356
        if x_f < self.x_f:
357
            self.x_f = x_f
358
            self.n_evals.append(self.Evals)
359
            self.x_f_vals.append(x_f)
360
            if self.logger:
361
                logger.info('nFES:%d => %s' % (self.Evals, self.x_f))
362
363
        return x_f
364
365
    def stopCond(self):
366
        r"""Check if stopping condition reached.
367
368
        Returns:
369
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
370
371
        """
372
373
        return (self.Evals >= self.nFES) or (self.Iters >= self.nGEN) or (self.refValue > self.x_f)
374
375
    def stopCondI(self):
376
        r"""Check if stopping condition reached and increase number of iterations.
377
378
        Returns:
379
                bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`.
380
381
        See Also:
382
                * :func:`NiaPy.util.StoppingTask.stopCond`
383
                * :func:`NiaPy.util.CountingTask.nextIter`
384
385
        """
386
387
        r = self.stopCond()
388
        CountingTask.nextIter(self)
389
        return r
390
391
    def return_conv(self):
392
        r"""Get values of x and y axis for plotting covariance graph.
393
394
        Returns:
395
                Tuple[List[int], List[float]]:
396
                    1. List of ints of function evaluations.
397
                    2. List of ints of function/fitness values.
398
399
        """
400
        r1, r2 = [], []
401
        for i, v in enumerate(self.n_evals):
402
            r1.append(v), r2.append(self.x_f_vals[i])
403
            if i >= len(self.n_evals) - 1: break
404
            diff = self.n_evals[i + 1] - v
405
            if diff <= 1: continue
406
            for j in range(diff - 1): r1.append(v + j + 1), r2.append(self.x_f_vals[i])
407
        return r1, r2
408
409
    def plot(self):
410
        """Plot a simple convergence graph."""
411
        fess, fitnesses = self.return_conv()
412
        plt.plot(fess, fitnesses)
413
        plt.xlabel('nFes')
414
        plt.ylabel('Fitness')
415
        plt.title('Convergence graph')
416
        plt.show()
417
418
419
class ThrowingTask(StoppingTask):
420
    r"""Task that throw exceptions when stopping condition is meet.
421
422
    See Also:
423
            * :class:`NiaPy.util.StoppingTask`
424
425
    """
426
427
    def __init__(self, **kwargs):
428
        r"""Initialize optimization task.
429
430
        Args:
431
                **kwargs (Dict[str, Any]): Additional arguments.
432
433
        See Also:
434
                * :func:`NiaPy.util.StoppingTask.__init__`
435
436
        """
437
438
        StoppingTask.__init__(self, **kwargs)
439
440
    def stopCondE(self):
441
        r"""Throw exception for the given stopping condition.
442
443
        Raises:
444
                * FesException: Thrown when the number of function/fitness evaluations is reached.
445
                * GenException: Thrown when the number of algorithms generations/iterations is reached.
446
                * RefException: Thrown when the reference values is reached.
447
                * TimeException: Thrown when algorithm exceeds time run limit.
448
449
        """
450
451
        # dtime = datetime.now() - self.startTime
452
        if self.Evals >= self.nFES:
453
            raise FesException()
454
        if self.Iters >= self.nGEN:
455
            raise GenException()
456
        # if self.runTime is not None and self.runTime >= dtime: raise TimeException()
457
        if self.refValue >= self.x_f:
458
            raise RefException()
459
460
    def eval(self, A):
461
        r"""Evaluate solution.
462
463
        Args:
464
                A (numpy.ndarray): Solution to evaluate.
465
466
        Returns:
467
                float: Function/fitness values of solution.
468
469
        See Also:
470
                * :func:`NiaPy.util.ThrowingTask.stopCondE`
471
                * :func:`NiaPy.util.StoppingTask.eval`
472
473
        """
474
475
        self.stopCondE()
476
        return StoppingTask.eval(self, A)
477