Completed
Pull Request — develop (#58)
by
unknown
02:45
created

EcsClient   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 75
Duplicated Lines 0 %

Test Coverage

Coverage 96.15%

Importance

Changes 4
Bugs 0 Features 0
Metric Value
wmc 11
c 4
b 0
f 0
dl 0
loc 75
ccs 25
cts 26
cp 0.9615
rs 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A describe_services() 0 4 1
A list_tasks() 0 4 1
A describe_tasks() 0 2 1
A deregister_task_definition() 0 3 1
A update_service() 0 6 1
A __init__() 0 8 1
A run_task() 0 7 1
A update_rule() 0 8 1
A describe_task_definition() 0 8 2
A register_task_definition() 0 8 1
1 1
from datetime import datetime
2
3 1
from boto3.session import Session
4 1
from botocore.exceptions import ClientError, NoCredentialsError
5 1
from dateutil.tz.tz import tzlocal
6
7
8 1
class EcsClient(object):
9 1
    def __init__(self, access_key_id=None, secret_access_key=None,
10
                 region=None, profile=None):
11 1
        session = Session(aws_access_key_id=access_key_id,
12
                          aws_secret_access_key=secret_access_key,
13
                          region_name=region,
14
                          profile_name=profile)
15 1
        self.boto = session.client(u'ecs')
16 1
        self.events = session.client(u'events')
17
18 1
    def describe_services(self, cluster_name, service_name):
19 1
        return self.boto.describe_services(
20
            cluster=cluster_name,
21
            services=[service_name]
22
        )
23
24 1
    def describe_task_definition(self, task_definition_arn):
25 1
        try:
26 1
            return self.boto.describe_task_definition(
27
                taskDefinition=task_definition_arn
28
            )
29 1
        except ClientError:
30 1
            raise UnknownTaskDefinitionError(
31
                u'Unknown task definition arn: %s' % task_definition_arn
32
            )
33
34 1
    def list_tasks(self, cluster_name, service_name):
35 1
        return self.boto.list_tasks(
36
            cluster=cluster_name,
37
            serviceName=service_name
38
        )
39
40 1
    def describe_tasks(self, cluster_name, task_arns):
41 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
42
43 1
    def register_task_definition(self, family, containers, volumes, role_arn,
44
                                 additional_properties):
45 1
        return self.boto.register_task_definition(
46
            family=family,
47
            containerDefinitions=containers,
48
            volumes=volumes,
49
            taskRoleArn=role_arn,
50
            **additional_properties
51
        )
52
53 1
    def deregister_task_definition(self, task_definition_arn):
54 1
        return self.boto.deregister_task_definition(
55
            taskDefinition=task_definition_arn
56
        )
57
58 1
    def update_service(self, cluster, service, desired_count, task_definition):
59 1
        return self.boto.update_service(
60
            cluster=cluster,
61
            service=service,
62
            desiredCount=desired_count,
63
            taskDefinition=task_definition
64
        )
65
66 1
    def run_task(self, cluster, task_definition, count, started_by, overrides):
67 1
        return self.boto.run_task(
68
            cluster=cluster,
69
            taskDefinition=task_definition,
70
            count=count,
71
            startedBy=started_by,
72
            overrides=overrides
73
        )
74
75 1
    def update_rule(self, rule, arn, target_id, role_arn, task_definition_arn):
76
        self.events.put_targets(
77
            Rule=rule,
78
            Targets=[{
79
                'Id': target_id,
80
                'Arn': arn,
81
                'RoleArn': role_arn,
82
                'EcsParameters': {'TaskDefinitionArn': task_definition_arn, 'TaskCount': 1}
83
            }]
84
        )
85
86
87 1
class EcsService(dict):
88 1
    def __init__(self, cluster, service_definition=None, **kwargs):
89 1
        self._cluster = cluster
90 1
        super(EcsService, self).__init__(service_definition, **kwargs)
91
92 1
    def set_desired_count(self, desired_count):
93 1
        self[u'desiredCount'] = desired_count
94
95 1
    def set_task_definition(self, task_definition):
96 1
        self[u'taskDefinition'] = task_definition.arn
97
98 1
    @property
99
    def cluster(self):
100 1
        return self._cluster
101
102 1
    @property
103
    def name(self):
104 1
        return self.get(u'serviceName')
105
106 1
    @property
107
    def task_definition(self):
108 1
        return self.get(u'taskDefinition')
109
110 1
    @property
111
    def desired_count(self):
112 1
        return self.get(u'desiredCount')
113
114 1
    @property
115
    def deployment_created_at(self):
116 1
        for deployment in self.get(u'deployments'):
117 1
            if deployment.get(u'status') == u'PRIMARY':
118 1
                return deployment.get(u'createdAt')
119 1
        return datetime.now()
120
121 1
    @property
122
    def deployment_updated_at(self):
123 1
        for deployment in self.get(u'deployments'):
124 1
            if deployment.get(u'status') == u'PRIMARY':
125 1
                return deployment.get(u'updatedAt')
126 1
        return datetime.now()
127
128 1
    @property
129
    def errors(self):
130 1
        return self.get_warnings(
131
            since=self.deployment_updated_at
132
        )
133
134 1
    @property
135
    def older_errors(self):
136 1
        return self.get_warnings(
137
            since=self.deployment_created_at,
138
            until=self.deployment_updated_at
139
        )
140
141 1
    def get_warnings(self, since=None, until=None):
142 1
        since = since or self.deployment_created_at
143 1
        until = until or datetime.now(tz=tzlocal())
144 1
        errors = {}
145 1
        for event in self.get(u'events'):
146 1
            if u'unable' not in event[u'message']:
147
                continue
148 1
            if since < event[u'createdAt'] < until:
149 1
                errors[event[u'createdAt']] = event[u'message']
150 1
        return errors
151
152
153 1
class EcsTaskDefinition(object):
154 1
    def __init__(self, containerDefinitions, volumes, family, revision,
155
                 status, taskDefinitionArn, requiresAttributes=None,
156
                 taskRoleArn=None, compatibilities=None, **kwargs):
157 1
        self.containers = containerDefinitions
158 1
        self.volumes = volumes
159 1
        self.family = family
160 1
        self.revision = revision
161 1
        self.status = status
162 1
        self.arn = taskDefinitionArn
163 1
        self.requires_attributes = requiresAttributes or {}
164 1
        self.role_arn = taskRoleArn or u''
165 1
        self.additional_properties = kwargs
166 1
        self._diff = []
167
168
        # the compatibilities parameter is returned from the ECS API, when
169
        # describing a task, but may not be included, when registering a new
170
        # task definition. Just storing it for now.
171 1
        self.compatibilities = compatibilities
172
173 1
    @property
174
    def container_names(self):
175 1
        for container in self.containers:
176 1
            yield container[u'name']
177
178 1
    @property
179
    def family_revision(self):
180 1
        return '%s:%d' % (self.family, self.revision)
181
182 1
    @property
183
    def diff(self):
184 1
        return self._diff
185
186 1
    def get_overrides(self):
187 1
        override = dict()
188 1
        overrides = []
189 1
        for diff in self.diff:
190 1
            if override.get('name') != diff.container:
191 1
                override = dict(name=diff.container)
192 1
                overrides.append(override)
193 1
            if diff.field == 'command':
194 1
                override['command'] = self.get_overrides_command(diff.value)
195 1
            elif diff.field == 'environment':
196 1
                override['environment'] = self.get_overrides_env(diff.value)
197 1
        return overrides
198
199 1
    @staticmethod
200
    def get_overrides_command(command):
201 1
        return command.split(' ')
202
203 1
    @staticmethod
204
    def get_overrides_env(env):
205 1
        return [{"name": e, "value": env[e]} for e in env]
206
207 1
    def set_images(self, tag=None, **images):
208 1
        self.validate_container_options(**images)
209 1
        for container in self.containers:
210 1
            if container[u'name'] in images:
211 1
                new_image = images[container[u'name']]
212 1
                diff = EcsTaskDefinitionDiff(
213
                    container=container[u'name'],
214
                    field=u'image',
215
                    value=new_image,
216
                    old_value=container[u'image']
217
                )
218 1
                self._diff.append(diff)
219 1
                container[u'image'] = new_image
220 1 View Code Duplication
            elif tag:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
221 1
                image_definition = container[u'image'].rsplit(u':', 1)
222 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
223 1
                diff = EcsTaskDefinitionDiff(
224
                    container=container[u'name'],
225
                    field=u'image',
226
                    value=new_image,
227
                    old_value=container[u'image']
228
                )
229 1
                self._diff.append(diff)
230 1
                container[u'image'] = new_image
231
232 1
    def set_commands(self, **commands):
233 1
        self.validate_container_options(**commands)
234 1
        for container in self.containers:
235 1 View Code Duplication
            if container[u'name'] in commands:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
236 1
                new_command = commands[container[u'name']]
237 1
                diff = EcsTaskDefinitionDiff(
238
                    container=container[u'name'],
239
                    field=u'command',
240
                    value=new_command,
241
                    old_value=container.get(u'command')
242
                )
243 1
                self._diff.append(diff)
244 1
                container[u'command'] = [new_command]
245
246 1
    def set_environment(self, environment_list):
247 1
        environment = {}
248
249 1
        for env in environment_list:
250 1
            environment.setdefault(env[0], {})
251 1
            environment[env[0]][env[1]] = env[2]
252
253 1
        self.validate_container_options(**environment)
254 1
        for container in self.containers:
255 1
            if container[u'name'] in environment:
256 1
                self.apply_container_environment(
257
                    container=container,
258
                    new_environment=environment[container[u'name']]
259
                )
260
261 1
    def apply_container_environment(self, container, new_environment):
262 1
        environment = container.get('environment', {})
263 1
        old_environment = {env['name']: env['value'] for env in environment}
264 1
        merged = old_environment.copy()
265 1
        merged.update(new_environment)
266
267 1
        if old_environment == merged:
268 1
            return
269
270 1
        diff = EcsTaskDefinitionDiff(
271
            container=container[u'name'],
272
            field=u'environment',
273
            value=merged,
274
            old_value=old_environment
275
        )
276 1
        self._diff.append(diff)
277
278 1
        container[u'environment'] = [
279
            {"name": e, "value": merged[e]} for e in merged
280
        ]
281
282 1
    def validate_container_options(self, **container_options):
283 1
        for container_name in container_options:
284 1
            if container_name not in self.container_names:
285 1
                raise UnknownContainerError(
286
                    u'Unknown container: %s' % container_name
287
                )
288
289 1
    def set_role_arn(self, role_arn):
290 1
        if role_arn:
291 1
            diff = EcsTaskDefinitionDiff(
292
                container=None,
293
                field=u'role_arn',
294
                value=role_arn,
295
                old_value=self.role_arn
296
            )
297 1
            self.role_arn = role_arn
298 1
            self._diff.append(diff)
299
300
301 1
class EcsTaskDefinitionDiff(object):
302 1
    def __init__(self, container, field, value, old_value):
303 1
        self.container = container
304 1
        self.field = field
305 1
        self.value = value
306 1
        self.old_value = old_value
307
308 1
    def __repr__(self):
309 1
        if self.field == u'environment':
310 1
            return '\n'.join(self._get_environment_diffs(
311
                self.container,
312
                self.value,
313
                self.old_value,
314
            ))
315 1
        elif self.container:
316 1
            return u'Changed %s of container "%s" to: "%s" (was: "%s")' % (
317
                self.field,
318
                self.container,
319
                self.value,
320
                self.old_value
321
            )
322
        else:
323 1
            return u'Changed %s to: "%s" (was: "%s")' % (
324
                self.field,
325
                self.value,
326
                self.old_value
327
            )
328
329 1
    @staticmethod
330
    def _get_environment_diffs(container, env, old_env):
331 1
        msg = u'Changed environment "%s" of container "%s" to: "%s"'
332 1
        diffs = []
333 1
        for name, value in env.items():
334 1
            old_value = old_env.get(name)
335 1
            if value != old_value or not old_value:
336 1
                message = msg % (name, container, value)
337 1
                diffs.append(message)
338 1
        return diffs
339
340
341 1
class EcsAction(object):
342 1
    def __init__(self, client, cluster_name, service_name):
343 1
        self._client = client
344 1
        self._cluster_name = cluster_name
345 1
        self._service_name = service_name
346
347 1
        try:
348 1
            if service_name:
349 1
                self._service = self.get_service()
350 1
        except IndexError:
351 1
            raise EcsConnectionError(
352
                u'An error occurred when calling the DescribeServices '
353
                u'operation: Service not found.'
354
            )
355 1
        except ClientError as e:
356 1
            raise EcsConnectionError(str(e))
357 1
        except NoCredentialsError:
358 1
            raise EcsConnectionError(
359
                u'Unable to locate credentials. Configure credentials '
360
                u'by running "aws configure".'
361
            )
362
363 1
    def get_service(self):
364 1
        services_definition = self._client.describe_services(
365
            cluster_name=self._cluster_name,
366
            service_name=self._service_name
367
        )
368 1
        return EcsService(
369
            cluster=self._cluster_name,
370
            service_definition=services_definition[u'services'][0]
371
        )
372
373 1
    def get_current_task_definition(self, service):
374 1
        return self.get_task_definition(service.task_definition)
375
376 1
    def get_task_definition(self, task_definition):
377 1
        task_definition_payload = self._client.describe_task_definition(
378
            task_definition_arn=task_definition
379
        )
380
381 1
        task_definition = EcsTaskDefinition(
382
            **task_definition_payload[u'taskDefinition']
383
        )
384 1
        return task_definition
385
386 1
    def update_task_definition(self, task_definition):
387 1
        response = self._client.register_task_definition(
388
            family=task_definition.family,
389
            containers=task_definition.containers,
390
            volumes=task_definition.volumes,
391
            role_arn=task_definition.role_arn,
392
            additional_properties=task_definition.additional_properties
393
        )
394 1
        new_task_definition = EcsTaskDefinition(**response[u'taskDefinition'])
395 1
        return new_task_definition
396
397 1
    def deregister_task_definition(self, task_definition):
398 1
        self._client.deregister_task_definition(task_definition.arn)
399
400 1
    def update_service(self, service):
401 1
        response = self._client.update_service(
402
            cluster=service.cluster,
403
            service=service.name,
404
            desired_count=service.desired_count,
405
            task_definition=service.task_definition
406
        )
407 1
        return EcsService(self._cluster_name, response[u'service'])
408
409 1
    def is_deployed(self, service):
410 1
        if len(service[u'deployments']) != 1:
411 1
            return False
412 1
        running_tasks = self._client.list_tasks(
413
            cluster_name=service.cluster,
414
            service_name=service.name
415
        )
416 1
        if not running_tasks[u'taskArns']:
417 1
            return service.desired_count == 0
418 1
        running_count = self.get_running_tasks_count(
419
            service=service,
420
            task_arns=running_tasks[u'taskArns']
421
        )
422 1
        return service.desired_count == running_count
423
424 1
    def get_running_tasks_count(self, service, task_arns):
425 1
        running_count = 0
426 1
        tasks_details = self._client.describe_tasks(
427
            cluster_name=self._cluster_name,
428
            task_arns=task_arns
429
        )
430 1
        for task in tasks_details[u'tasks']:
431 1
            arn = task[u'taskDefinitionArn']
432 1
            status = task[u'lastStatus']
433 1
            if arn == service.task_definition and status == u'RUNNING':
434 1
                running_count += 1
435 1
        return running_count
436
437 1
    @property
438
    def client(self):
439 1
        return self._client
440
441 1
    @property
442
    def service(self):
443 1
        return self._service
444
445 1
    @property
446
    def cluster_name(self):
447 1
        return self._cluster_name
448
449 1
    @property
450
    def service_name(self):
451 1
        return self._service_name
452
453
454 1
class DeployAction(EcsAction):
455 1
    def deploy(self, task_definition):
456 1
        try:
457 1
            self._service.set_task_definition(task_definition)
458 1
            return self.update_service(self._service)
459 1
        except ClientError as e:
460 1
            raise EcsError(str(e))
461
462
463 1
class ScaleAction(EcsAction):
464 1
    def scale(self, desired_count):
465 1
        try:
466 1
            self._service.set_desired_count(desired_count)
467 1
            return self.update_service(self._service)
468 1
        except ClientError as e:
469 1
            raise EcsError(str(e))
470
471
472 1
class RunAction(EcsAction):
473 1
    def __init__(self, client, cluster_name):
474 1
        super(RunAction, self).__init__(client, cluster_name, None)
475 1
        self._client = client
476 1
        self._cluster_name = cluster_name
477 1
        self.started_tasks = []
478
479 1
    def run(self, task_definition, count, started_by):
480 1
        try:
481 1
            result = self._client.run_task(
482
                cluster=self._cluster_name,
483
                task_definition=task_definition.family_revision,
484
                count=count,
485
                started_by=started_by,
486
                overrides=dict(containerOverrides=task_definition.get_overrides())
487
            )
488 1
            self.started_tasks = result['tasks']
489 1
            return True
490 1
        except ClientError as e:
491 1
            raise EcsError(str(e))
492
493
494 1
class EcsError(Exception):
495 1
    pass
496
497
498 1
class EcsConnectionError(EcsError):
499 1
    pass
500
501
502 1
class UnknownContainerError(EcsError):
503 1
    pass
504
505
506 1
class TaskPlacementError(EcsError):
507 1
    pass
508
509
510 1
class UnknownTaskDefinitionError(EcsError):
511
    pass
512