Passed
Pull Request — master (#233)
by
unknown
01:19
created

EvolutionStrategyML.newPop()   A

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 6
nop 2
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
# encoding=utf8
2
# pylint: disable=mixed-indentation, trailing-whitespace, multiple-statements, attribute-defined-outside-init, logging-not-lazy, unused-argument, singleton-comparison, no-else-return, line-too-long, arguments-differ, no-self-use, superfluous-parens, redefined-builtin, bad-continuation, unused-variable, assignment-from-no-return
3
import logging
4
from math import ceil
5
6
from numpy import argmin, argsort, log, sum, fmax, sqrt, full, exp, eye, diag, apply_along_axis, round, any, asarray, dot, random as rand, tile, inf, where, append
7
from numpy.linalg import norm, cholesky as chol, eig, solve, lstsq
8
9
from NiaPy.algorithms.algorithm import Algorithm, Individual, defaultIndividualInit
10
from NiaPy.util.utility import objects2array
11
12
logging.basicConfig()
13
logger = logging.getLogger('NiaPy.algorithms.basic')
14
logger.setLevel('INFO')
15
16
__all__ = ['EvolutionStrategy1p1', 'EvolutionStrategyMp1', 'EvolutionStrategyMpL', 'EvolutionStrategyML', 'CovarianceMatrixAdaptionEvolutionStrategy']
17
18
class IndividualES(Individual):
19
	r"""Individual for Evolution Strategies.
20
21
	See Also:
22
		* :class:`NiaPy.algorithms.Individual`
23
	"""
24
	def __init__(self, **kwargs):
25
		r"""Initialize individual.
26
27
		Args:
28
			kwargs (Dict[str, Any]): Additional arguments.
29
30
		See Also:
31
			* :func:`NiaPy.algorithms.Individual.__init__`
32
		"""
33
		Individual.__init__(self, **kwargs)
34
		self.rho = kwargs.get('rho', 1)
35
36
class EvolutionStrategy1p1(Algorithm):
37
	r"""Implementation of (1 + 1) evolution strategy algorithm. Uses just one individual.
38
39
	Algorithm:
40
		(1 + 1) Evolution Strategy Algorithm
41
42
	Date:
43
		2018
44
45
	Authors:
46
		Klemen Berkovič
47
48
	License:
49
		MIT
50
51
	Reference URL:
52
53
	Reference paper:
54
55
	Attributes:
56
		Name (List[str]): List of strings representing algorithm names.
57
		mu (int): Number of parents.
58
		k (int): Number of iterations before checking and fixing rho.
59
		c_a (float): Search range amplification factor.
60
		c_r (float): Search range reduction factor.
61
62
	See Also:
63
		* :class:`NiaPy.algorithms.Algorithm`
64
	"""
65
	Name = ['EvolutionStrategy1p1', 'EvolutionStrategy(1+1)', 'ES(1+1)']
66
67 View Code Duplication
	@staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
68
	def typeParameters():
69
		r"""Get dictionary with functions for checking values of parameters.
70
71
		Returns:
72
			Dict[str, Callable]:
73
				* mu (Callable[[int], bool])
74
				* k (Callable[[int], bool])
75
				* c_a (Callable[[Union[float, int]], bool])
76
				* c_r (Callable[[Union[float, int]], bool])
77
				* epsilon (Callable[[float], bool])
78
		"""
79
		return {
80
			'mu': lambda x: isinstance(x, int) and x > 0,
81
			'k': lambda x: isinstance(x, int) and x > 0,
82
			'c_a': lambda x: isinstance(x, (float, int)) and x > 1,
83
			'c_r': lambda x: isinstance(x, (float, int)) and 0 < x < 1,
84
			'epsilon': lambda x: isinstance(x, float) and 0 < x < 1
85
		}
86
87
	def setParameters(self, mu=1, k=10, c_a=1.1, c_r=0.5, epsilon=1e-20, **ukwargs):
88
		r"""Set the arguments of an algorithm.
89
90
		Arguments:
91
			mu (Optional[int]): Number of parents
92
			k (Optional[int]): Number of iterations before checking and fixing rho
93
			c_a (Optional[float]): Search range amplification factor
94
			c_r (Optional[float]): Search range reduction factor
95
96
		See Also:
97
			* :func:`NiaPy.algorithms.Algorithm.setParameters`
98
		"""
99
		Algorithm.setParameters(self, NP=mu, itype=ukwargs.pop('itype', IndividualES), **ukwargs)
100
		self.mu, self.k, self.c_a, self.c_r, self.epsilon = mu, k, c_a, c_r, epsilon
101
102
	def mutate(self, x, rho):
103
		r"""Mutate individual.
104
105
		Args:
106
			x (Individual): Current individual.
107
			rho (float): Current standard deviation.
108
109
		Returns:
110
			Individual: Mutated individual.
111
		"""
112
		return x + self.normal(0, rho, len(x))
113
114
	def updateRho(self, rho, k):
115
		r"""Update standard deviation.
116
117
		Args:
118
			rho (float): Current standard deviation.
119
			k (int): Number of succesfull mutations.
120
121
		Returns:
122
			float: New standard deviation.
123
		"""
124
		phi = k / self.k
125
		if phi < 0.2: return self.c_r * rho if rho > self.epsilon else 1
126
		elif phi > 0.2: return self.c_a * rho if rho > self.epsilon else 1
127
		else: return rho
128
129
	def initPopulation(self, task):
130
		r"""Initialize starting individual.
131
132
		Args:
133
			task (Task): Optimization task.
134
135
		Returns:
136
			Tuple[Individual, float, Dict[str, Any]]:
137
				1, Initialized individual.
138
				2, Initialized individual fitness/function value.
139
				3. Additional arguments:
140
					* ki (int): Number of successful rho update.
141
		"""
142
		c, ki = IndividualES(task=task, rnd=self.Rand), 0
143
		return c, c.f, {'ki': ki}
144
145
	def runIteration(self, task, c, fpop, xb, fxb, ki, **dparams):
146
		r"""Core function of EvolutionStrategy(1+1) algorithm.
147
148
		Args:
149
			task (Task): Optimization task.
150
			pop (Individual): Current position.
151
			fpop (float): Current position function/fitness value.
152
			xb (Individual): Global best position.
153
			fxb (float): Global best function/fitness value.
154
			ki (int): Number of successful updates before rho update.
155
			**dparams (Dict[str, Any]): Additional arguments.
156
157
		Returns:
158
			Tuple[Individual, float, Individual, float, Dict[str, Any]]:
159
				1, Initialized individual.
160
				2, Initialized individual fitness/function value.
161
				3. New global best solution.
162
				4. New global best soluitons fitness/objective value.
163
				5. Additional arguments:
164
					* ki (int): Number of successful rho update.
165
		"""
166
		if task.Iters % self.k == 0: c.rho, ki = self.updateRho(c.rho, ki), 0
167
		cn = objects2array([task.repair(self.mutate(c.x, c.rho), self.Rand) for _i in range(self.mu)])
168
		cn_f = asarray([task.eval(cn[i]) for i in range(len(cn))])
169
		ib = argmin(cn_f)
170
		if cn_f[ib] < c.f:
171
			c.x, c.f, ki = cn[ib], cn_f[ib], ki + 1
172
			if cn_f[ib] < fxb: xb, fxb = self.getBest(cn[ib], cn_f[ib], xb, fxb)
173
		return c, c.f, xb, fxb, {'ki': ki}
174
175
class EvolutionStrategyMp1(EvolutionStrategy1p1):
176
	r"""Implementation of (mu + 1) evolution strategy algorithm. Algorithm creates mu mutants but into new generation goes only one individual.
177
178
	Algorithm:
179
		(:math:`\mu + 1`) Evolution Strategy Algorithm
180
181
	Date:
182
		2018
183
184
	Authors:
185
		Klemen Berkovič
186
187
	License:
188
		MIT
189
190
	Reference URL:
191
192
	Reference paper:
193
194
	Attributes:
195
		Name (List[str]): List of strings representing algorithm names.
196
197
	See Also:
198
		* :class:`NiaPy.algorithms.basic.EvolutionStrategy1p1`
199
	"""
200
	Name = ['EvolutionStrategyMp1', 'EvolutionStrategy(mu+1)', 'ES(m+1)']
201
202
	def setParameters(self, **kwargs):
203
		r"""Set core parameters of EvolutionStrategy(mu+1) algorithm.
204
205
		Args:
206
			**kwargs (Dict[str, Any]):
207
208
		See Also:
209
			* :func:`NiaPy.algorithms.basic.EvolutionStrategy1p1.setParameters`
210
		"""
211
		mu = kwargs.pop('mu', 40)
212
		EvolutionStrategy1p1.setParameters(self, mu=mu, **kwargs)
213
214
class EvolutionStrategyMpL(EvolutionStrategy1p1):
215
	r"""Implementation of (mu + lambda) evolution strategy algorithm. Mulation creates lambda individual. Lambda individual compete with mu individuals for survival, so only mu individual go to new generation.
216
217
	Algorithm:
218
		(:math:`\mu + \lambda`) Evolution Strategy Algorithm
219
220
	Date:
221
		2018
222
223
	Authors:
224
		Klemen Berkovič
225
226
	License:
227
		MIT
228
229
	Reference URL:
230
231
	Reference paper:
232
233
	Attributes:
234
		Name (List[str]): List of strings representing algorithm names
235
		lam (int): TODO
236
237
	See Also:
238
		* :class:`NiaPy.algorithms.basic.EvolutionStrategy1p1`
239
	"""
240
	Name = ['EvolutionStrategyMpL', 'EvolutionStrategy(mu+lambda)', 'ES(m+l)']
241
242
	@staticmethod
243
	def typeParameters():
244
		r"""TODO.
245
246
		Returns:
247
			Dict[str, Any]:
248
				* lam (Callable[[int], bool]): TODO.
249
250
		See Also:
251
			* :func:`NiaPy.algorithms.basic.EvolutionStrategy1p1`
252
		"""
253
		d = EvolutionStrategy1p1.typeParameters()
254
		d['lam'] = lambda x: isinstance(x, int) and x > 0
255
		return d
256
257
	def setParameters(self, lam=45, **ukwargs):
258
		r"""Set the arguments of an algorithm.
259
260
		Arguments:
261
			lam (int): Number of new individual generated by mutation.
262
263
		See Also:
264
			* :func:`NiaPy.algorithms.basic.es.EvolutionStrategy1p1.setParameters`
265
		"""
266
		EvolutionStrategy1p1.setParameters(self, InitPopFunc=defaultIndividualInit, **ukwargs)
267
		self.lam = lam
268
269
	def updateRho(self, pop, k):
270
		r"""Update standard deviation for population.
271
272
		Args:
273
			pop (numpy.ndarray[Individual]): Current population.
274
			k (int): Number of successful mutations.
275
		"""
276
		phi = k / self.k
277
		if phi < 0.2:
278
			for i in pop: i.rho = self.c_r * i.rho
279
		elif phi > 0.2:
280
			for i in pop: i.rho = self.c_a * i.rho
281
282
	def changeCount(self, c, cn):
283
		r"""Update number of successful mutations for population.
284
285
		Args:
286
			c (numpy.ndarray[Individual]): Current population.
287
			cn (numpy.ndarray[Individual]): New population.
288
289
		Returns:
290
			int: Number of successful mutations.
291
		"""
292
		k = 0
293
		for e in cn:
294
			if e not in c: k += 1
295
		return k
296
297
	def mutateRand(self, pop, task):
298
		r"""Mutate random individual form population.
299
300
		Args:
301
			pop (numpy.ndarray[Individual]): Current population.
302
			task (Task): Optimization task.
303
304
		Returns:
305
			numpy.ndarray: Random individual from population that was mutated.
306
		"""
307
		i = self.randint(self.mu)
308
		return task.repair(self.mutate(pop[i].x, pop[i].rho), rnd=self.Rand)
309
310
	def initPopulation(self, task):
311
		r"""Initialize starting population.
312
313
		Args:
314
			task (Task): Optimization task.
315
316
		Returns:
317
			Tuple[numpy.ndarray[Individual], numpy.ndarray[float], Dict[str, Any]]:
318
				1. Initialized populaiton.
319
				2. Initialized populations function/fitness values.
320
				3. Additional arguments:
321
					* ki (int): Number of successful mutations.
322
323
		See Also:
324
			* :func:`NiaPy.algorithms.algorithm.Algorithm.initPopulation`
325
		"""
326
		c, fc, d = Algorithm.initPopulation(self, task)
327
		d.update({'ki': 0})
328
		return c, fc, d
329
330
	def runIteration(self, task, c, fpop, xb, fxb, ki, **dparams):
331
		r"""Core function of EvolutionStrategyMpL algorithm.
332
333
		Args:
334
			task (Task): Optimization task.
335
			c (numpy.ndarray): Current population.
336
			fpop (numpy.ndarray): Current populations fitness/function values.
337
			xb (numpy.ndarray): Global best individual.
338
			fxb (float): Global best individuals fitness/function value.
339
			ki (int): Number of successful mutations.
340
			**dparams (Dict[str, Any]): Additional arguments.
341
342
		Returns:
343
			Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, float, Dict[str, Any]]:
344
				1. New population.
345
				2. New populations function/fitness values.
346
				3. New global best solution.
347
				4. New global best solutions fitness/objective value.
348
				5. Additional arguments:
349
					* ki (int): Number of successful mutations.
350
		"""
351
		if task.Iters % self.k == 0: _, ki = self.updateRho(c, ki), 0
352
		cn = objects2array([IndividualES(x=self.mutateRand(c, task), task=task, rnd=self.Rand) for _ in range(self.lam)])
353
		cn = append(cn, c)
354
		cn = objects2array([cn[i] for i in argsort([i.f for i in cn])[:self.mu]])
355
		ki += self.changeCount(c, cn)
356
		fcn = asarray([x.f for x in cn])
357
		xb, fxb = self.getBest(cn, fcn, xb, fxb)
358
		return cn, fcn, xb, fxb, {'ki': ki}
359
360
class EvolutionStrategyML(EvolutionStrategyMpL):
361
	r"""Implementation of (mu, lambda) evolution strategy algorithm. Algorithm is good for dynamic environments. Mu individual create lambda chields. Only best mu chields go to new generation. Mu parents are discarded.
362
363
	Algorithm:
364
		(:math:`\mu + \lambda`) Evolution Strategy Algorithm
365
366
	Date:
367
		2018
368
369
	Authors:
370
		Klemen Berkovič
371
372
	License:
373
		MIT
374
375
	Reference URL:
376
377
	Reference paper:
378
379
	Attributes:
380
		Name (List[str]): List of strings representing algorithm names
381
382
	See Also:
383
		* :class:`NiaPy.algorithm.basic.es.EvolutionStrategyMpL`
384
	"""
385
	Name = ['EvolutionStrategyML', 'EvolutionStrategy(mu,lambda)', 'ES(m,l)']
386
387
	def newPop(self, pop):
388
		r"""Return new population.
389
390
		Args:
391
			pop (numpy.ndarray): Current population.
392
393
		Returns:
394
			numpy.ndarray: New population.
395
		"""
396
		pop_s = argsort([i.f for i in pop])
397
		if self.mu < self.lam: return objects2array([pop[i] for i in pop_s[:self.mu]])
398
		npop = list()
399
		for i in range(int(ceil(float(self.mu) / self.lam))): npop.extend(pop[:self.lam if (self.mu - i * self.lam) >= self.lam else self.mu - i * self.lam])
400
		return objects2array(npop)
401
402
	def initPopulation(self, task):
403
		r"""Initialize starting population.
404
405
		Args:
406
			task (Task): Optimization task.
407
408
		Returns:
409
			Tuple[numpy.ndarray[Individual], numpy.ndarray[float], Dict[str, Any]]:
410
				1. Initialized population.
411
				2. Initialized populations fitness/function values.
412
				2. Additional arguments.
413
414
		See Also:
415
			* :func:`NiaPy.algorithm.basic.es.EvolutionStrategyMpL.initPopulation`
416
		"""
417
		c, fc, _ = EvolutionStrategyMpL.initPopulation(self, task)
418
		return c, fc, {}
419
420
	def runIteration(self, task, c, fpop, xb, fxb, **dparams):
421
		r"""Core function of EvolutionStrategyML algorithm.
422
423
		Args:
424
			task (Task): Optimization task.
425
			c (numpy.ndarray): Current population.
426
			fpop (numpy.ndarray): Current population fitness/function values.
427
			xb (numpy.ndarray): Global best individual.
428
			fxb (float): Global best individuals fitness/function value.
429
			**dparams Dict[str, Any]: Additional arguments.
430
431
		Returns:
432
			Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, float, Dict[str, Any]]:
433
				1. New population.
434
				2. New populations fitness/function values.
435
				3. New global best solution.
436
				4. New global best solutions fitness/objective value.
437
				5. Additional arguments.
438
		"""
439
		cn = objects2array([IndividualES(x=self.mutateRand(c, task), task=task, rand=self.Rand) for _ in range(self.lam)])
440
		c = self.newPop(cn)
441
		fc = asarray([x.f for x in c])
442
		xb, fxb = self.getBest(c, fc, xb, fxb)
443
		return c, fc, xb, fxb, {}
444
445
def CovarianceMaatrixAdaptionEvolutionStrategyF(task, epsilon=1e-20, rnd=rand):
446
	lam, alpha_mu, hs, sigma0 = (4 + round(3 * log(task.D))) * 10, 2, 0, 0.3 * task.bcRange()
447
	mu = int(round(lam / 2))
448
	w = log(mu + 0.5) - log(range(1, mu + 1))
449
	w = w / sum(w)
450
	mueff = 1 / sum(w ** 2)
451
	cs = (mueff + 2) / (task.D + mueff + 5)
452
	ds = 1 + cs + 2 * max(sqrt((mueff - 1) / (task.D + 1)) - 1, 0)
453
	ENN = sqrt(task.D) * (1 - 1 / (4 * task.D) + 1 / (21 * task.D ** 2))
454
	cc, c1 = (4 + mueff / task.D) / (4 + task.D + 2 * mueff / task.D), 2 / ((task.D + 1.3) ** 2 + mueff)
455
	cmu, hth = min(1 - c1, alpha_mu * (mueff - 2 + 1 / mueff) / ((task.D + 2) ** 2 + alpha_mu * mueff / 2)), (1.4 + 2 / (task.D + 1)) * ENN
456
	ps, pc, C, sigma, M = full(task.D, 0.0), full(task.D, 0.0), eye(task.D), sigma0, full(task.D, 0.0)
457
	x = rnd.uniform(task.bcLower(), task.bcUpper())
458
	x_f = task.eval(x)
459
	while not task.stopCondI():
460
		pop_step = asarray([rnd.multivariate_normal(full(task.D, 0.0), C) for _ in range(int(lam))])
461
		pop = asarray([task.repair(x + sigma * ps, rnd) for ps in pop_step])
462
		pop_f = apply_along_axis(task.eval, 1, pop)
463
		isort = argsort(pop_f)
464
		pop, pop_f, pop_step = pop[isort[:mu]], pop_f[isort[:mu]], pop_step[isort[:mu]]
465
		if pop_f[0] < x_f: x, x_f = pop[0], pop_f[0]
466
		M = sum(w * pop_step.T, axis=1)
467
		ps = solve(chol(C).conj() + epsilon, ((1 - cs) * ps + sqrt(cs * (2 - cs) * mueff) * M + epsilon).T)[0].T
468
		sigma *= exp(cs / ds * (norm(ps) / ENN - 1)) ** 0.3
469
		ifix = where(sigma == inf)
470
		if any(ifix): sigma[ifix] = sigma0
471
		if norm(ps) / sqrt(1 - (1 - cs) ** (2 * (task.Iters + 1))) < hth: hs = 1
472
		else: hs = 0
473
		delta = (1 - hs) * cc * (2 - cc)
474
		pc = (1 - cc) * pc + hs * sqrt(cc * (2 - cc) * mueff) * M
475
		C = (1 - c1 - cmu) * C + c1 * (tile(pc, [len(pc), 1]) * tile(pc.reshape([len(pc), 1]), [1, len(pc)]) + delta * C)
476
		for i in range(mu): C += cmu * w[i] * tile(pop_step[i], [len(pop_step[i]), 1]) * tile(pop_step[i].reshape([len(pop_step[i]), 1]), [1, len(pop_step[i])])
477
		E, V = eig(C)
478
		if any(E < epsilon):
479
			E = fmax(E, 0)
480
			C = lstsq(V.T, dot(V, diag(E)).T, rcond=None)[0].T
481
	return x, x_f
482
483
class CovarianceMatrixAdaptionEvolutionStrategy(Algorithm):
484
	r"""Implementation of (mu, lambda) evolution strategy algorithm. Algorithm is good for dynamic environments. Mu individual create lambda chields. Only best mu chields go to new generation. Mu parents are discarded.
485
486
	Algorithm:
487
		(:math:`\mu + \lambda`) Evolution Strategy Algorithm
488
489
	Date:
490
		2018
491
492
	Authors:
493
		Klemen Berkovič
494
495
	License:
496
		MIT
497
498
	Reference URL:
499
		https://arxiv.org/abs/1604.00772
500
501
	Reference paper:
502
		Hansen, Nikolaus. "The CMA evolution strategy: A tutorial." arXiv preprint arXiv:1604.00772 (2016).
503
504
	Attributes:
505
		Name (List[str]): List of names representing algorithm names
506
		epsilon (float): TODO
507
	"""
508
	Name = ['CovarianceMatrixAdaptionEvolutionStrategy', 'CMA-ES', 'CMAES']
509
	epsilon = 1e-20
510
511
	@staticmethod
512
	def typeParameters(): return {
513
			'epsilon': lambda x: isinstance(x, (float, int)) and 0 < x < 1
514
	}
515
516
	def setParameters(self, epsilon=1e-20, **ukwargs):
517
		r"""Set core parameters of CovarianceMatrixAdaptionEvolutionStrategy algorithm.
518
519
		Args:
520
			epsilon (float): Small number.
521
			**ukwargs (Dict[str, Any]): Additional arguments.
522
		"""
523
		Algorithm.setParameters(self, **ukwargs)
524
		self.epsilon = epsilon
525
526
	def runTask(self, task):
527
		r"""TODO.
528
529
		Args:
530
			task (Task): Optimization task.
531
532
		Returns:
533
			TODO.
534
		"""
535
		return CovarianceMaatrixAdaptionEvolutionStrategyF(task, self.epsilon, rnd=self.Rand)
536
537
# vim: tabstop=3 noexpandtab shiftwidth=3 softtabstop=3
538