Completed
Pull Request — develop (#29)
by Fabian
01:38
created

EcsTaskDefinition.diff()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1 1
from datetime import datetime
2
3 1
from boto3.session import Session
4 1
from botocore.exceptions import ClientError, NoCredentialsError
5 1
from dateutil.tz.tz import tzlocal
6
7
8 1
class EcsClient(object):
9 1
    def __init__(self, access_key_id=None, secret_access_key=None,
10
                 region=None, profile=None):
11 1
        session = Session(aws_access_key_id=access_key_id,
12
                          aws_secret_access_key=secret_access_key,
13
                          region_name=region,
14
                          profile_name=profile)
15 1
        self.boto = session.client(u'ecs')
16
17 1
    def describe_services(self, cluster_name, service_name):
18 1
        return self.boto.describe_services(
19
            cluster=cluster_name,
20
            services=[service_name]
21
        )
22
23 1
    def describe_task_definition(self, task_definition_arn):
24 1
        try:
25 1
            return self.boto.describe_task_definition(
26
                taskDefinition=task_definition_arn
27
            )
28 1
        except ClientError:
29 1
            raise UnknownTaskDefinitionError(
30
                u'Unknown task definition arn: %s' % task_definition_arn
31
            )
32
33 1
    def list_tasks(self, cluster_name, service_name):
34 1
        return self.boto.list_tasks(
35
            cluster=cluster_name,
36
            serviceName=service_name
37
        )
38
39 1
    def describe_tasks(self, cluster_name, task_arns):
40 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
41
42 1
    def register_task_definition(self, family, containers, volumes, role_arn,
43
                                 additional_properties):
44 1
        return self.boto.register_task_definition(
45
            family=family,
46
            containerDefinitions=containers,
47
            volumes=volumes,
48
            taskRoleArn=role_arn or u'',
49
            **additional_properties
50
        )
51
52 1
    def deregister_task_definition(self, task_definition_arn):
53 1
        return self.boto.deregister_task_definition(
54
            taskDefinition=task_definition_arn
55
        )
56
57 1
    def update_service(self, cluster, service, desired_count, task_definition):
58 1
        return self.boto.update_service(
59
            cluster=cluster,
60
            service=service,
61
            desiredCount=desired_count,
62
            taskDefinition=task_definition
63
        )
64
65 1
    def run_task(self, cluster, task_definition, count, started_by, overrides):
66 1
        return self.boto.run_task(
67
            cluster=cluster,
68
            taskDefinition=task_definition,
69
            count=count,
70
            startedBy=started_by,
71
            overrides=overrides
72
        )
73
74
75 1
class EcsService(dict):
76 1
    def __init__(self, cluster, service_definition=None, **kwargs):
77 1
        self._cluster = cluster
78 1
        super(EcsService, self).__init__(service_definition, **kwargs)
79
80 1
    def set_desired_count(self, desired_count):
81 1
        self[u'desiredCount'] = desired_count
82
83 1
    def set_task_definition(self, task_definition):
84 1
        self[u'taskDefinition'] = task_definition.arn
85
86 1
    @property
87
    def cluster(self):
88 1
        return self._cluster
89
90 1
    @property
91
    def name(self):
92 1
        return self.get(u'serviceName')
93
94 1
    @property
95
    def task_definition(self):
96 1
        return self.get(u'taskDefinition')
97
98 1
    @property
99
    def desired_count(self):
100 1
        return self.get(u'desiredCount')
101
102 1
    @property
103
    def deployment_created_at(self):
104 1
        for deployment in self.get(u'deployments'):
105 1
            if deployment.get(u'status') == u'PRIMARY':
106 1
                return deployment.get(u'createdAt')
107 1
        return datetime.now()
108
109 1
    @property
110
    def deployment_updated_at(self):
111 1
        for deployment in self.get(u'deployments'):
112 1
            if deployment.get(u'status') == u'PRIMARY':
113 1
                return deployment.get(u'updatedAt')
114 1
        return datetime.now()
115
116 1
    @property
117
    def errors(self):
118 1
        return self.get_warnings(
119
            since=self.deployment_updated_at
120
        )
121
122 1
    @property
123
    def older_errors(self):
124 1
        return self.get_warnings(
125
            since=self.deployment_created_at,
126
            until=self.deployment_updated_at
127
        )
128
129 1
    def get_warnings(self, since=None, until=None):
130 1
        since = since or self.deployment_created_at
131 1
        until = until or datetime.now(tz=tzlocal())
132 1
        errors = {}
133 1
        for event in self.get(u'events'):
134 1
            if u'unable' not in event[u'message']:
135
                continue
136 1
            if since < event[u'createdAt'] < until:
137 1
                errors[event[u'createdAt']] = event[u'message']
138 1
        return errors
139
140
141 1
class EcsTaskDefinition(object):
142 1
    def __init__(self, containerDefinitions, volumes, family, revision,
143
                 taskRoleArn, status, taskDefinitionArn, requiresAttributes,
144
                 **kwargs):
145 1
        self.containers = containerDefinitions
146 1
        self.volumes = volumes
147 1
        self.family = family
148 1
        self.revision = revision
149 1
        self.role_arn = taskRoleArn
150 1
        self.arn = taskDefinitionArn
151 1
        self.status = status
152 1
        self.requires_attributes = requiresAttributes
153 1
        self.additional_properties = kwargs
154 1
        self._diff = []
155
156 1
    @property
157
    def container_names(self):
158 1
        for container in self.containers:
159 1
            yield container[u'name']
160
161 1
    @property
162
    def family_revision(self):
163 1
        return '%s:%d' % (self.family, self.revision)
164
165 1
    @property
166
    def diff(self):
167 1
        return self._diff
168
169 1
    def get_overrides(self):
170 1
        override = dict()
171 1
        overrides = []
172 1
        for diff in self.diff:
173 1
            if override.get('name') != diff.container:
174 1
                override = dict(name=diff.container)
175 1
                overrides.append(override)
176 1
            if diff.field == 'command':
177 1
                override['command'] = self.get_overrides_command(diff.value)
178 1
            elif diff.field == 'environment':
179 1
                override['environment'] = self.get_overrides_env(diff.value)
180 1
        return overrides
181
182 1
    @staticmethod
183
    def get_overrides_command(command):
184 1
        return command.split(' ')
185
186 1
    @staticmethod
187
    def get_overrides_env(env):
188 1
        return [{"name": e, "value": env[e]} for e in env]
189
190 1
    def set_images(self, tag=None, **images):
191 1
        self.validate_container_options(**images)
192 1
        for container in self.containers:
193 1
            if container[u'name'] in images:
194 1
                new_image = images[container[u'name']]
195 1
                diff = EcsTaskDefinitionDiff(
196
                    container=container[u'name'],
197
                    field=u'image',
198
                    value=new_image,
199
                    old_value=container[u'image']
200
                )
201 1
                self._diff.append(diff)
202 1
                container[u'image'] = new_image
203 1
            elif tag:
204 1
                image_definition = container[u'image'].rsplit(u':', 1)
205 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
206 1
                diff = EcsTaskDefinitionDiff(
207
                    container=container[u'name'],
208
                    field=u'image',
209
                    value=new_image,
210
                    old_value=container[u'image']
211
                )
212 1
                self._diff.append(diff)
213 1
                container[u'image'] = new_image
214
215 1
    def set_commands(self, **commands):
216 1 View Code Duplication
        self.validate_container_options(**commands)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
217 1
        for container in self.containers:
218 1
            if container[u'name'] in commands:
219 1
                new_command = commands[container[u'name']]
220 1
                diff = EcsTaskDefinitionDiff(
221
                    container=container[u'name'],
222
                    field=u'command',
223
                    value=new_command,
224
                    old_value=container.get(u'command')
225
                )
226 1
                self._diff.append(diff)
227 1
                container[u'command'] = [new_command]
228
229 1
    def set_environment(self, environment_list):
230 1
        environment = {}
231 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232 1
        for env in environment_list:
233 1
            environment.setdefault(env[0], {})
234 1
            environment[env[0]][env[1]] = env[2]
235
236 1
        self.validate_container_options(**environment)
237 1
        for container in self.containers:
238 1
            if container[u'name'] in environment:
239 1
                self.apply_container_environment(
240
                    container=container,
241
                    new_environment=environment[container[u'name']]
242
                )
243
244 1
    def apply_container_environment(self, container, new_environment):
245 1
        environment = container.get('environment', {})
246 1
        old_environment = {env['name']: env['value'] for env in environment}
247 1
        merged = old_environment.copy()
248 1
        merged.update(new_environment)
249
250 1
        diff = EcsTaskDefinitionDiff(
251
            container=container[u'name'],
252
            field=u'environment',
253
            value=merged,
254
            old_value=old_environment
255
        )
256 1
        self._diff.append(diff)
257
258 1
        container[u'environment'] = [
259
            {"name": e, "value": merged[e]} for e in merged
260
        ]
261
262 1
    def validate_container_options(self, **container_options):
263 1
        for container_name in container_options:
264 1
            if container_name not in self.container_names:
265 1
                raise UnknownContainerError(
266
                    u'Unknown container: %s' % container_name
267
                )
268
269 1
    def set_role_arn(self, role_arn):
270 1
        if role_arn:
271 1
            diff = EcsTaskDefinitionDiff(
272
                container=None,
273
                field=u'role_arn',
274
                value=role_arn,
275
                old_value=self.role_arn
276
            )
277 1
            self.role_arn = role_arn
278 1
            self._diff.append(diff)
279
280
281 1
class EcsTaskDefinitionDiff(object):
282 1
    def __init__(self, container, field, value, old_value):
283 1
        self.container = container
284 1
        self.field = field
285 1
        self.value = value
286 1
        self.old_value = old_value
287
288 1
    def __repr__(self):
289 1
        if self.field == u'environment':
290 1
            return '\n'.join(self._get_environment_diffs(
291
                self.container,
292
                self.value,
293
                self.old_value,
294
            ))
295 1
        elif self.container:
296 1
            return u'Changed %s of container "%s" to: "%s" (was: "%s")' % (
297
                self.field,
298
                self.container,
299
                self.value,
300
                self.old_value
301
            )
302
        else:
303 1
            return u'Changed %s to: "%s" (was: "%s")' % (
304
                self.field,
305
                self.value,
306
                self.old_value
307
            )
308
309 1
    @staticmethod
310
    def _get_environment_diffs(container, env, old_env):
311 1
        msg = u'Changed environment "%s" of container "%s" to: "%s"'
312 1
        diffs = []
313 1
        for name, value in env.items():
314 1
            old_value = old_env.get(name)
315 1
            if value != old_value or not old_value:
316 1
                message = msg % (name, container, value)
317 1
                diffs.append(message)
318 1
        return diffs
319
320
321 1
class EcsAction(object):
322 1
    def __init__(self, client, cluster_name, service_name):
323 1
        self._client = client
324 1
        self._cluster_name = cluster_name
325 1
        self._service_name = service_name
326
327 1
        try:
328 1
            if service_name:
329 1
                self._service = self.get_service()
330 1
        except IndexError:
331 1
            raise EcsConnectionError(
332
                u'An error occurred when calling the DescribeServices '
333
                u'operation: Service not found.'
334
            )
335 1
        except ClientError as e:
336 1
            raise EcsConnectionError(str(e))
337 1
        except NoCredentialsError:
338 1
            raise EcsConnectionError(
339
                u'Unable to locate credentials. Configure credentials '
340
                u'by running "aws configure".'
341
            )
342
343 1
    def get_service(self):
344 1
        services_definition = self._client.describe_services(
345
            cluster_name=self._cluster_name,
346
            service_name=self._service_name
347
        )
348 1
        return EcsService(
349
            cluster=self._cluster_name,
350
            service_definition=services_definition[u'services'][0]
351
        )
352
353 1
    def get_current_task_definition(self, service):
354 1
        return self.get_task_definition(service.task_definition)
355
356 1
    def get_task_definition(self, task_definition):
357 1
        task_definition_payload = self._client.describe_task_definition(
358
            task_definition_arn=task_definition
359
        )
360
361 1
        task_definition = EcsTaskDefinition(
362
            **task_definition_payload[u'taskDefinition']
363
        )
364 1
        return task_definition
365
366 1
    def update_task_definition(self, task_definition):
367 1
        response = self._client.register_task_definition(
368
            family=task_definition.family,
369
            containers=task_definition.containers,
370
            volumes=task_definition.volumes,
371
            role_arn=task_definition.role_arn,
372
            additional_properties=task_definition.additional_properties
373
        )
374 1
        new_task_definition = EcsTaskDefinition(**response[u'taskDefinition'])
375 1
        self._client.deregister_task_definition(task_definition.arn)
376 1
        return new_task_definition
377
378 1
    def update_service(self, service):
379 1
        response = self._client.update_service(
380
            cluster=service.cluster,
381
            service=service.name,
382
            desired_count=service.desired_count,
383
            task_definition=service.task_definition
384
        )
385 1
        return EcsService(self._cluster_name, response[u'service'])
386
387 1
    def is_deployed(self, service):
388 1
        if len(service[u'deployments']) != 1:
389 1
            return False
390 1
        running_tasks = self._client.list_tasks(
391
            cluster_name=service.cluster,
392
            service_name=service.name
393
        )
394 1
        if not running_tasks[u'taskArns']:
395 1
            return service.desired_count == 0
396 1
        running_count = self.get_running_tasks_count(
397
            service=service,
398
            task_arns=running_tasks[u'taskArns']
399
        )
400 1
        return service.desired_count == running_count
401
402 1
    def get_running_tasks_count(self, service, task_arns):
403 1
        running_count = 0
404 1
        tasks_details = self._client.describe_tasks(
405
            cluster_name=self._cluster_name,
406
            task_arns=task_arns
407
        )
408 1
        for task in tasks_details[u'tasks']:
409 1
            arn = task[u'taskDefinitionArn']
410 1
            status = task[u'lastStatus']
411 1
            if arn == service.task_definition and status == u'RUNNING':
412 1
                running_count += 1
413 1
        return running_count
414
415 1
    @property
416
    def client(self):
417 1
        return self._client
418
419 1
    @property
420
    def service(self):
421 1
        return self._service
422
423 1
    @property
424
    def cluster_name(self):
425 1
        return self._cluster_name
426
427 1
    @property
428
    def service_name(self):
429 1
        return self._service_name
430
431
432 1
class DeployAction(EcsAction):
433 1
    def deploy(self, task_definition):
434 1
        self._service.set_task_definition(task_definition)
435 1
        return self.update_service(self._service)
436
437
438 1
class ScaleAction(EcsAction):
439 1
    def scale(self, desired_count):
440 1
        self._service.set_desired_count(desired_count)
441 1
        return self.update_service(self._service)
442
443
444 1
class RunAction(EcsAction):
445 1
    def __init__(self, client, cluster_name):
446 1
        super(RunAction, self).__init__(client, cluster_name, None)
447 1
        self._client = client
448 1
        self._cluster_name = cluster_name
449 1
        self.started_tasks = []
450
451 1
    def run(self, task_definition, count, started_by):
452 1
        result = self._client.run_task(
453
            cluster=self._cluster_name,
454
            task_definition=task_definition.family_revision,
455
            count=count,
456
            started_by=started_by,
457
            overrides=dict(containerOverrides=task_definition.get_overrides())
458
        )
459 1
        self.started_tasks = result['tasks']
460 1
        return True
461
462
463 1
class EcsError(Exception):
464 1
    pass
465
466
467 1
class EcsConnectionError(EcsError):
468 1
    pass
469
470
471 1
class UnknownContainerError(EcsError):
472 1
    pass
473
474
475 1
class TaskPlacementError(EcsError):
476 1
    pass
477
478
479 1
class UnknownTaskDefinitionError(EcsError):
480
    pass
481