Passed
Branch develop (de6256)
by Fabian
01:15
created

ecs_deploy.ecs.EcsTaskDefinition.__init__()   A

Complexity

Conditions 1

Size

Total Lines 18
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 18
rs 9.7
c 0
b 0
f 0
ccs 12
cts 12
cp 1
cc 1
nop 11
crap 1

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
from datetime import datetime
2
3 1
from boto3.session import Session
4 1
from botocore.exceptions import ClientError, NoCredentialsError
5 1
from dateutil.tz.tz import tzlocal
6
7
8 1
class EcsClient(object):
9 1
    def __init__(self, access_key_id=None, secret_access_key=None,
10
                 region=None, profile=None):
11 1
        session = Session(aws_access_key_id=access_key_id,
12
                          aws_secret_access_key=secret_access_key,
13
                          region_name=region,
14
                          profile_name=profile)
15 1
        self.boto = session.client(u'ecs')
16
17 1
    def describe_services(self, cluster_name, service_name):
18 1
        return self.boto.describe_services(
19
            cluster=cluster_name,
20
            services=[service_name]
21
        )
22
23 1
    def describe_task_definition(self, task_definition_arn):
24 1
        try:
25 1
            return self.boto.describe_task_definition(
26
                taskDefinition=task_definition_arn
27
            )
28 1
        except ClientError:
29 1
            raise UnknownTaskDefinitionError(
30
                u'Unknown task definition arn: %s' % task_definition_arn
31
            )
32
33 1
    def list_tasks(self, cluster_name, service_name):
34 1
        return self.boto.list_tasks(
35
            cluster=cluster_name,
36
            serviceName=service_name
37
        )
38
39 1
    def describe_tasks(self, cluster_name, task_arns):
40 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
41
42 1
    def register_task_definition(self, family, containers, volumes, role_arn,
43
                                 additional_properties):
44 1
        return self.boto.register_task_definition(
45
            family=family,
46
            containerDefinitions=containers,
47
            volumes=volumes,
48
            taskRoleArn=role_arn,
49
            **additional_properties
50
        )
51
52 1
    def deregister_task_definition(self, task_definition_arn):
53 1
        return self.boto.deregister_task_definition(
54
            taskDefinition=task_definition_arn
55
        )
56
57 1
    def update_service(self, cluster, service, desired_count, task_definition):
58 1
        if desired_count is None:
59
            return self.boto.update_service(
60
                cluster=cluster,
61
                service=service,
62
                taskDefinition=task_definition
63
            )
64 1
        return self.boto.update_service(
65
            cluster=cluster,
66
            service=service,
67
            desiredCount=desired_count,
68
            taskDefinition=task_definition
69
        )
70
71 1
    def run_task(self, cluster, task_definition, count, started_by, overrides):
72 1
        return self.boto.run_task(
73
            cluster=cluster,
74
            taskDefinition=task_definition,
75
            count=count,
76
            startedBy=started_by,
77
            overrides=overrides
78
        )
79
80
81 1
class EcsService(dict):
82 1
    def __init__(self, cluster, service_definition=None, **kwargs):
83 1
        self._cluster = cluster
84 1
        super(EcsService, self).__init__(service_definition, **kwargs)
85
86 1
    def set_task_definition(self, task_definition):
87 1
        self[u'taskDefinition'] = task_definition.arn
88
89 1
    @property
90
    def cluster(self):
91 1
        return self._cluster
92
93 1
    @property
94
    def name(self):
95 1
        return self.get(u'serviceName')
96
97 1
    @property
98
    def task_definition(self):
99 1
        return self.get(u'taskDefinition')
100
101 1
    @property
102
    def desired_count(self):
103 1
        return self.get(u'desiredCount')
104
105 1
    @property
106
    def deployment_created_at(self):
107 1
        for deployment in self.get(u'deployments'):
108 1
            if deployment.get(u'status') == u'PRIMARY':
109 1
                return deployment.get(u'createdAt')
110 1
        return datetime.now()
111
112 1
    @property
113
    def deployment_updated_at(self):
114 1
        for deployment in self.get(u'deployments'):
115 1
            if deployment.get(u'status') == u'PRIMARY':
116 1
                return deployment.get(u'updatedAt')
117 1
        return datetime.now()
118
119 1
    @property
120
    def errors(self):
121 1
        return self.get_warnings(
122
            since=self.deployment_updated_at
123
        )
124
125 1
    @property
126
    def older_errors(self):
127 1
        return self.get_warnings(
128
            since=self.deployment_created_at,
129
            until=self.deployment_updated_at
130
        )
131
132 1
    def get_warnings(self, since=None, until=None):
133 1
        since = since or self.deployment_created_at
134 1
        until = until or datetime.now(tz=tzlocal())
135 1
        errors = {}
136 1
        for event in self.get(u'events'):
137 1
            if u'unable' not in event[u'message']:
138
                continue
139 1
            if since < event[u'createdAt'] < until:
140 1
                errors[event[u'createdAt']] = event[u'message']
141 1
        return errors
142
143
144 1
class EcsTaskDefinition(object):
145 1
    def __init__(self, containerDefinitions, volumes, family, revision,
146
                 status, taskDefinitionArn, requiresAttributes=None,
147
                 taskRoleArn=None, compatibilities=None, **kwargs):
148 1
        self.containers = containerDefinitions
149 1
        self.volumes = volumes
150 1
        self.family = family
151 1
        self.revision = revision
152 1
        self.status = status
153 1
        self.arn = taskDefinitionArn
154 1
        self.requires_attributes = requiresAttributes or {}
155 1
        self.role_arn = taskRoleArn or u''
156 1
        self.additional_properties = kwargs
157 1
        self._diff = []
158
159
        # the compatibilities parameter is returned from the ECS API, when
160
        # describing a task, but may not be included, when registering a new
161
        # task definition. Just storing it for now.
162 1
        self.compatibilities = compatibilities
163
164 1
    @property
165
    def container_names(self):
166 1
        for container in self.containers:
167 1
            yield container[u'name']
168
169 1
    @property
170
    def family_revision(self):
171 1
        return '%s:%d' % (self.family, self.revision)
172
173 1
    @property
174
    def diff(self):
175 1
        return self._diff
176
177 1
    def get_overrides(self):
178 1
        override = dict()
179 1
        overrides = []
180 1
        for diff in self.diff:
181 1
            if override.get('name') != diff.container:
182 1
                override = dict(name=diff.container)
183 1
                overrides.append(override)
184 1
            if diff.field == 'command':
185 1
                override['command'] = self.get_overrides_command(diff.value)
186 1
            elif diff.field == 'environment':
187 1
                override['environment'] = self.get_overrides_env(diff.value)
188 1
        return overrides
189
190 1
    @staticmethod
191
    def get_overrides_command(command):
192 1
        return command.split(' ')
193
194 1
    @staticmethod
195
    def get_overrides_env(env):
196 1
        return [{"name": e, "value": env[e]} for e in env]
197
198 1
    def set_images(self, tag=None, **images):
199 1
        self.validate_container_options(**images)
200 1
        for container in self.containers:
201 1
            if container[u'name'] in images:
202 1
                new_image = images[container[u'name']]
203 1
                diff = EcsTaskDefinitionDiff(
204
                    container=container[u'name'],
205
                    field=u'image',
206
                    value=new_image,
207
                    old_value=container[u'image']
208
                )
209 1
                self._diff.append(diff)
210 1
                container[u'image'] = new_image
211 1
            elif tag:
212 1
                image_definition = container[u'image'].rsplit(u':', 1)
213 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
214 1
                diff = EcsTaskDefinitionDiff(
215
                    container=container[u'name'],
216
                    field=u'image',
217
                    value=new_image,
218
                    old_value=container[u'image']
219
                )
220 1
                self._diff.append(diff)
221 1
                container[u'image'] = new_image
222
223 1
    def set_commands(self, **commands):
224 1
        self.validate_container_options(**commands)
225 1
        for container in self.containers:
226 1
            if container[u'name'] in commands:
227 1
                new_command = commands[container[u'name']]
228 1
                diff = EcsTaskDefinitionDiff(
229
                    container=container[u'name'],
230
                    field=u'command',
231
                    value=new_command,
232
                    old_value=container.get(u'command')
233
                )
234 1
                self._diff.append(diff)
235 1
                container[u'command'] = [new_command]
236
237 1
    def set_environment(self, environment_list):
238 1
        environment = {}
239
240 1
        for env in environment_list:
241 1
            environment.setdefault(env[0], {})
242 1
            environment[env[0]][env[1]] = env[2]
243
244 1
        self.validate_container_options(**environment)
245 1
        for container in self.containers:
246 1
            if container[u'name'] in environment:
247 1
                self.apply_container_environment(
248
                    container=container,
249
                    new_environment=environment[container[u'name']]
250
                )
251
252 1
    def apply_container_environment(self, container, new_environment):
253 1
        environment = container.get('environment', {})
254 1
        old_environment = {env['name']: env['value'] for env in environment}
255 1
        merged = old_environment.copy()
256 1
        merged.update(new_environment)
257
258 1
        if old_environment == merged:
259 1
            return
260
261 1
        diff = EcsTaskDefinitionDiff(
262
            container=container[u'name'],
263
            field=u'environment',
264
            value=merged,
265
            old_value=old_environment
266
        )
267 1
        self._diff.append(diff)
268
269 1
        container[u'environment'] = [
270
            {"name": e, "value": merged[e]} for e in merged
271
        ]
272
273 1
    def validate_container_options(self, **container_options):
274 1
        for container_name in container_options:
275 1
            if container_name not in self.container_names:
276 1
                raise UnknownContainerError(
277
                    u'Unknown container: %s' % container_name
278
                )
279
280 1
    def set_role_arn(self, role_arn):
281 1
        if role_arn:
282 1
            diff = EcsTaskDefinitionDiff(
283
                container=None,
284
                field=u'role_arn',
285
                value=role_arn,
286
                old_value=self.role_arn
287
            )
288 1
            self.role_arn = role_arn
289 1
            self._diff.append(diff)
290
291
292 1
class EcsTaskDefinitionDiff(object):
293 1
    def __init__(self, container, field, value, old_value):
294 1
        self.container = container
295 1
        self.field = field
296 1
        self.value = value
297 1
        self.old_value = old_value
298
299 1
    def __repr__(self):
300 1
        if self.field == u'environment':
301 1
            return '\n'.join(self._get_environment_diffs(
302
                self.container,
303
                self.value,
304
                self.old_value,
305
            ))
306 1
        elif self.container:
307 1
            return u'Changed %s of container "%s" to: "%s" (was: "%s")' % (
308
                self.field,
309
                self.container,
310
                self.value,
311
                self.old_value
312
            )
313
        else:
314 1
            return u'Changed %s to: "%s" (was: "%s")' % (
315
                self.field,
316
                self.value,
317
                self.old_value
318
            )
319
320 1
    @staticmethod
321
    def _get_environment_diffs(container, env, old_env):
322 1
        msg = u'Changed environment "%s" of container "%s" to: "%s"'
323 1
        diffs = []
324 1
        for name, value in env.items():
325 1
            old_value = old_env.get(name)
326 1
            if value != old_value or not old_value:
327 1
                message = msg % (name, container, value)
328 1
                diffs.append(message)
329 1
        return diffs
330
331
332 1
class EcsAction(object):
333 1
    def __init__(self, client, cluster_name, service_name):
334 1
        self._client = client
335 1
        self._cluster_name = cluster_name
336 1
        self._service_name = service_name
337
338 1
        try:
339 1
            if service_name:
340 1
                self._service = self.get_service()
341 1
        except IndexError:
342 1
            raise EcsConnectionError(
343
                u'An error occurred when calling the DescribeServices '
344
                u'operation: Service not found.'
345
            )
346 1
        except ClientError as e:
347 1
            raise EcsConnectionError(str(e))
348 1
        except NoCredentialsError:
349 1
            raise EcsConnectionError(
350
                u'Unable to locate credentials. Configure credentials '
351
                u'by running "aws configure".'
352
            )
353
354 1
    def get_service(self):
355 1
        services_definition = self._client.describe_services(
356
            cluster_name=self._cluster_name,
357
            service_name=self._service_name
358
        )
359 1
        return EcsService(
360
            cluster=self._cluster_name,
361
            service_definition=services_definition[u'services'][0]
362
        )
363
364 1
    def get_current_task_definition(self, service):
365 1
        return self.get_task_definition(service.task_definition)
366
367 1
    def get_task_definition(self, task_definition):
368 1
        task_definition_payload = self._client.describe_task_definition(
369
            task_definition_arn=task_definition
370
        )
371
372 1
        task_definition = EcsTaskDefinition(
373
            **task_definition_payload[u'taskDefinition']
374
        )
375 1
        return task_definition
376
377 1
    def update_task_definition(self, task_definition):
378 1
        response = self._client.register_task_definition(
379
            family=task_definition.family,
380
            containers=task_definition.containers,
381
            volumes=task_definition.volumes,
382
            role_arn=task_definition.role_arn,
383
            additional_properties=task_definition.additional_properties
384
        )
385 1
        new_task_definition = EcsTaskDefinition(**response[u'taskDefinition'])
386 1
        return new_task_definition
387
388 1
    def deregister_task_definition(self, task_definition):
389 1
        self._client.deregister_task_definition(task_definition.arn)
390
391 1
    def update_service(self, service, desired_count=None):
392 1
        response = self._client.update_service(
393
            cluster=service.cluster,
394
            service=service.name,
395
            desired_count=desired_count,
396
            task_definition=service.task_definition
397
        )
398 1
        return EcsService(self._cluster_name, response[u'service'])
399
400 1
    def is_deployed(self, service):
401 1
        if len(service[u'deployments']) != 1:
402 1
            return False
403 1
        running_tasks = self._client.list_tasks(
404
            cluster_name=service.cluster,
405
            service_name=service.name
406
        )
407 1
        if not running_tasks[u'taskArns']:
408 1
            return service.desired_count == 0
409 1
        running_count = self.get_running_tasks_count(
410
            service=service,
411
            task_arns=running_tasks[u'taskArns']
412
        )
413 1
        return service.desired_count == running_count
414
415 1
    def get_running_tasks_count(self, service, task_arns):
416 1
        running_count = 0
417 1
        tasks_details = self._client.describe_tasks(
418
            cluster_name=self._cluster_name,
419
            task_arns=task_arns
420
        )
421 1
        for task in tasks_details[u'tasks']:
422 1
            arn = task[u'taskDefinitionArn']
423 1
            status = task[u'lastStatus']
424 1
            if arn == service.task_definition and status == u'RUNNING':
425 1
                running_count += 1
426 1
        return running_count
427
428 1
    @property
429
    def client(self):
430 1
        return self._client
431
432 1
    @property
433
    def service(self):
434 1
        return self._service
435
436 1
    @property
437
    def cluster_name(self):
438 1
        return self._cluster_name
439
440 1
    @property
441
    def service_name(self):
442 1
        return self._service_name
443
444
445 1
class DeployAction(EcsAction):
446 1
    def deploy(self, task_definition):
447 1
        try:
448 1
            self._service.set_task_definition(task_definition)
449 1
            return self.update_service(self._service)
450 1
        except ClientError as e:
451 1
            raise EcsError(str(e))
452
453
454 1
class ScaleAction(EcsAction):
455 1
    def scale(self, desired_count):
456 1
        try:
457 1
            return self.update_service(self._service, desired_count)
458 1
        except ClientError as e:
459 1
            raise EcsError(str(e))
460
461
462 1
class RunAction(EcsAction):
463 1
    def __init__(self, client, cluster_name):
464 1
        super(RunAction, self).__init__(client, cluster_name, None)
465 1
        self._client = client
466 1
        self._cluster_name = cluster_name
467 1
        self.started_tasks = []
468
469 1
    def run(self, task_definition, count, started_by):
470 1
        try:
471 1
            result = self._client.run_task(
472
                cluster=self._cluster_name,
473
                task_definition=task_definition.family_revision,
474
                count=count,
475
                started_by=started_by,
476
                overrides=dict(containerOverrides=task_definition.get_overrides())
477
            )
478 1
            self.started_tasks = result['tasks']
479 1
            return True
480 1
        except ClientError as e:
481 1
            raise EcsError(str(e))
482
483
484 1
class EcsError(Exception):
485 1
    pass
486
487
488 1
class EcsConnectionError(EcsError):
489 1
    pass
490
491
492 1
class UnknownContainerError(EcsError):
493 1
    pass
494
495
496 1
class TaskPlacementError(EcsError):
497 1
    pass
498
499
500 1
class UnknownTaskDefinitionError(EcsError):
501
    pass
502