Completed
Pull Request — develop (#23)
by
unknown
01:24
created

EcsTaskDefinition.volumes()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 3
ccs 2
cts 2
cp 1
c 1
b 0
f 0
rs 10
cc 1
crap 1
1 1
from datetime import datetime
2 1
from json import dumps
3
4 1
from boto3.session import Session
5 1
from botocore.exceptions import ClientError, NoCredentialsError
6 1
from dateutil.tz.tz import tzlocal
7
8
9 1
class EcsClient(object):
10 1
    def __init__(self, access_key_id=None, secret_access_key=None,
11
                 region=None, profile=None):
12 1
        session = Session(aws_access_key_id=access_key_id,
13
                          aws_secret_access_key=secret_access_key,
14
                          region_name=region,
15
                          profile_name=profile)
16 1
        self.boto = session.client(u'ecs')
17
18 1
    def describe_services(self, cluster_name, service_name):
19 1
        return self.boto.describe_services(
20
            cluster=cluster_name,
21
            services=[service_name]
22
        )
23
24 1
    def describe_task_definition(self, task_definition_arn):
25 1
        try:
26 1
            return self.boto.describe_task_definition(
27
                taskDefinition=task_definition_arn
28
            )
29 1
        except ClientError:
30 1
            raise UnknownTaskDefinitionError(
31
                u'Unknown task definition arn: %s' % task_definition_arn
32
            )
33
34 1
    def list_tasks(self, cluster_name, service_name):
35 1
        return self.boto.list_tasks(
36
            cluster=cluster_name,
37
            serviceName=service_name
38
        )
39
40 1
    def describe_tasks(self, cluster_name, task_arns):
41 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
42
43 1
    def register_task_definition(self, family, containers, volumes, role_arn):
44 1
        return self.boto.register_task_definition(
45
            family=family,
46
            containerDefinitions=containers,
47
            volumes=volumes,
48
            taskRoleArn=role_arn or u''
49
        )
50
51 1
    def deregister_task_definition(self, task_definition_arn):
52 1
        return self.boto.deregister_task_definition(
53
            taskDefinition=task_definition_arn
54
        )
55
56 1
    def update_service(self, cluster, service, desired_count, task_definition):
57 1
        return self.boto.update_service(
58
            cluster=cluster,
59
            service=service,
60
            desiredCount=desired_count,
61
            taskDefinition=task_definition
62
        )
63
64 1
    def run_task(self, cluster, task_definition, count, started_by, overrides):
65 1
        return self.boto.run_task(
66
            cluster=cluster,
67
            taskDefinition=task_definition,
68
            count=count,
69
            startedBy=started_by,
70
            overrides=overrides
71
        )
72
73
74 1
class EcsService(dict):
75 1
    def __init__(self, cluster, service_definition=None, **kwargs):
76 1
        self._cluster = cluster
77 1
        super(EcsService, self).__init__(service_definition, **kwargs)
78
79 1
    def set_desired_count(self, desired_count):
80 1
        self[u'desiredCount'] = desired_count
81
82 1
    def set_task_definition(self, task_definition):
83 1
        self[u'taskDefinition'] = task_definition.arn
84
85 1
    @property
86
    def cluster(self):
87 1
        return self._cluster
88
89 1
    @property
90
    def name(self):
91 1
        return self.get(u'serviceName')
92
93 1
    @property
94
    def task_definition(self):
95 1
        return self.get(u'taskDefinition')
96
97 1
    @property
98
    def desired_count(self):
99 1
        return self.get(u'desiredCount')
100
101 1
    @property
102
    def deployment_created_at(self):
103 1
        for deployment in self.get(u'deployments'):
104 1
            if deployment.get(u'status') == u'PRIMARY':
105 1
                return deployment.get(u'createdAt')
106 1
        return datetime.now()
107
108 1
    @property
109
    def deployment_updated_at(self):
110 1
        for deployment in self.get(u'deployments'):
111 1
            if deployment.get(u'status') == u'PRIMARY':
112 1
                return deployment.get(u'updatedAt')
113 1
        return datetime.now()
114
115 1
    @property
116
    def errors(self):
117 1
        return self.get_warnings(
118
            since=self.deployment_updated_at
119
        )
120
121 1
    @property
122
    def older_errors(self):
123 1
        return self.get_warnings(
124
            since=self.deployment_created_at,
125
            until=self.deployment_updated_at
126
        )
127
128 1
    def get_warnings(self, since=None, until=None):
129 1
        since = since or self.deployment_created_at
130 1
        until = until or datetime.now(tz=tzlocal())
131 1
        errors = {}
132 1
        for event in self.get(u'events'):
133 1
            if u'unable' not in event[u'message']:
134
                continue
135 1
            if since < event[u'createdAt'] < until:
136 1
                errors[event[u'createdAt']] = event[u'message']
137 1
        return errors
138
139
140 1
class EcsTaskDefinition(dict):
141 1
    def __init__(self, task_definition=None, **kwargs):
142 1
        super(EcsTaskDefinition, self).__init__(task_definition, **kwargs)
143 1
        self._diff = []
144
145 1
    @property
146
    def containers(self):
147 1
        return self.get(u'containerDefinitions')
148
149 1
    @property
150
    def container_names(self):
151 1
        for container in self.get(u'containerDefinitions'):
152 1
            yield container[u'name']
153
154 1
    @property
155
    def volumes(self):
156 1
        return self.get(u'volumes')
157
158 1
    @property
159
    def arn(self):
160 1
        return self.get(u'taskDefinitionArn')
161
162 1
    @property
163
    def family(self):
164 1
        return self.get(u'family')
165
166 1
    @property
167
    def role_arn(self):
168 1
        return self.get(u'taskRoleArn')
169
170 1
    @property
171
    def revision(self):
172 1
        return self.get(u'revision')
173
174 1
    @property
175
    def family_revision(self):
176 1
        return '%s:%d' % (self.get(u'family'), self.get(u'revision'))
177
178 1
    @property
179
    def diff(self):
180 1
        return self._diff
181
182 1
    def get_overrides(self):
183 1
        override = dict()
184 1
        overrides = []
185 1
        for diff in self.diff:
186 1
            if override.get('name') != diff.container:
187 1
                override = dict(name=diff.container)
188 1
                overrides.append(override)
189 1
            if diff.field == 'command':
190 1
                override['command'] = diff.value
191 1
            elif diff.field == 'environment':
192 1
                override['environment'] = self.get_overrides_env(diff.value)
193 1
        return overrides
194
195 1
    @staticmethod
196
    def get_overrides_env(env):
197 1
        return [{"name": e, "value": env[e]} for e in env]
198
199 1
    def set_images(self, tag=None, **images):
200 1
        self.validate_container_options(**images)
201 1
        for container in self.containers:
202 1
            if container[u'name'] in images:
203 1
                new_image = images[container[u'name']]
204 1
                diff = EcsTaskDefinitionDiff(
205
                    container=container[u'name'],
206
                    field=u'image',
207
                    value=new_image,
208
                    old_value=container[u'image']
209
                )
210 1
                self._diff.append(diff)
211 1
                container[u'image'] = new_image
212 1
            elif tag:
213 1
                image_definition = container[u'image'].rsplit(u':', 1)
214 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
215 1
                diff = EcsTaskDefinitionDiff(
216 View Code Duplication
                    container=container[u'name'],
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
217
                    field=u'image',
218
                    value=new_image,
219
                    old_value=container[u'image']
220
                )
221 1
                self._diff.append(diff)
222 1
                container[u'image'] = new_image
223
224 1
    def set_commands(self, **commands):
225 1
        self.validate_container_options(**commands)
226 1
        for container in self.containers:
227 1
            if container[u'name'] in commands:
228 1
                new_command = commands[container[u'name']]
229 1
                diff = EcsTaskDefinitionDiff(
230
                    container=container[u'name'],
231 View Code Duplication
                    field=u'command',
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232
                    value=new_command,
233
                    old_value=container.get(u'command')
234
                )
235 1
                self._diff.append(diff)
236 1
                container[u'command'] = [new_command]
237
238 1
    def set_environment(self, environment_list):
239 1
        environment = {}
240
241 1
        for env in environment_list:
242 1
            environment.setdefault(env[0], {})
243 1
            environment[env[0]][env[1]] = env[2]
244
245 1
        self.validate_container_options(**environment)
246 1
        for container in self.containers:
247 1
            if container[u'name'] in environment:
248 1
                self.apply_container_environment(
249
                    container=container,
250
                    new_environment=environment[container[u'name']]
251
                )
252
253 1
    def apply_container_environment(self, container, new_environment):
254 1
        environment = container.get('environment', {})
255 1
        old_environment = {env['name']: env['value'] for env in environment}
256 1
        merged = old_environment.copy()
257 1
        merged.update(new_environment)
258
259 1
        diff = EcsTaskDefinitionDiff(
260
            container=container[u'name'],
261
            field=u'environment',
262
            value=merged,
263
            old_value=old_environment
264
        )
265 1
        self._diff.append(diff)
266
267 1
        container[u'environment'] = [
268
            {"name": e, "value": merged[e]} for e in merged
269
        ]
270
271 1
    def validate_container_options(self, **container_options):
272 1
        for container_name in container_options:
273 1
            if container_name not in self.container_names:
274 1
                raise UnknownContainerError(
275
                    u'Unknown container: %s' % container_name
276
                )
277
278 1
    def set_role_arn(self, role_arn):
279 1
        if role_arn:
280 1
            diff = EcsTaskDefinitionDiff(
281
                container=None,
282
                field=u'role_arn',
283
                value=role_arn,
284
                old_value=self[u'taskRoleArn']
285
            )
286 1
            self[u'taskRoleArn'] = role_arn
287 1
            self._diff.append(diff)
288
289
290 1
class EcsTaskDefinitionDiff(object):
291 1
    def __init__(self, container, field, value, old_value):
292 1
        self.container = container
293 1
        self.field = field
294 1
        self.value = value
295 1
        self.old_value = old_value
296
297 1
    def __repr__(self):
298 1
        if self.container:
299 1
            return u"Changed %s of container '%s' to: %s (was: %s)" % (
300
                self.field,
301
                self.container,
302
                dumps(self.value),
303
                dumps(self.old_value)
304
            )
305
        else:
306 1
            return u"Changed %s to: %s (was: %s)" % (
307
                self.field,
308
                dumps(self.value),
309
                dumps(self.old_value)
310
            )
311
312
313 1
class EcsAction(object):
314 1
    def __init__(self, client, cluster_name, service_name):
315 1
        self._client = client
316 1
        self._cluster_name = cluster_name
317 1
        self._service_name = service_name
318
319 1
        try:
320 1
            if service_name:
321 1
                self._service = self.get_service()
322 1
        except IndexError:
323 1
            raise EcsConnectionError(
324
                u'An error occurred when calling the DescribeServices '
325
                u'operation: Service not found.'
326
            )
327 1
        except ClientError as e:
328 1
            raise EcsConnectionError(str(e))
329 1
        except NoCredentialsError:
330 1
            raise EcsConnectionError(
331
                u'Unable to locate credentials. Configure credentials '
332
                u'by running "aws configure".'
333
            )
334
335 1
    def get_service(self):
336 1
        services_definition = self._client.describe_services(
337
            cluster_name=self._cluster_name,
338
            service_name=self._service_name
339
        )
340 1
        return EcsService(
341
            cluster=self._cluster_name,
342
            service_definition=services_definition[u'services'][0]
343
        )
344
345 1
    def get_current_task_definition(self, service):
346 1
        task_definition_payload = self._client.describe_task_definition(
347
            task_definition_arn=service.task_definition
348
        )
349 1
        task_definition = EcsTaskDefinition(
350
            task_definition=task_definition_payload[u'taskDefinition']
351
        )
352 1
        return task_definition
353
354 1
    def get_task_definition(self, task_definition):
355 1
        task_definition_payload = self._client.describe_task_definition(
356
            task_definition_arn=task_definition
357
        )
358 1
        task_definition = EcsTaskDefinition(
359
            task_definition=task_definition_payload[u'taskDefinition']
360
        )
361 1
        return task_definition
362
363 1
    def update_task_definition(self, task_definition):
364 1
        response = self._client.register_task_definition(
365
            family=task_definition.family,
366
            containers=task_definition.containers,
367
            volumes=task_definition.volumes,
368
            role_arn=task_definition.role_arn
369
        )
370 1
        new_task_definition = EcsTaskDefinition(response[u'taskDefinition'])
371 1
        self._client.deregister_task_definition(task_definition.arn)
372 1
        return new_task_definition
373
374 1
    def update_service(self, service):
375 1
        response = self._client.update_service(
376
            cluster=service.cluster,
377
            service=service.name,
378
            desired_count=service.desired_count,
379
            task_definition=service.task_definition
380
        )
381 1
        return EcsService(self._cluster_name, response[u'service'])
382
383 1
    def is_deployed(self, service):
384 1
        if len(service[u'deployments']) != 1:
385 1
            return False
386 1
        running_tasks = self._client.list_tasks(
387
            cluster_name=service.cluster,
388
            service_name=service.name
389
        )
390 1
        if not running_tasks[u'taskArns']:
391 1
            return service.desired_count == 0
392 1
        running_count = self.get_running_tasks_count(
393
            service=service,
394
            task_arns=running_tasks[u'taskArns']
395
        )
396 1
        return service.desired_count == running_count
397
398 1
    def get_running_tasks_count(self, service, task_arns):
399 1
        running_count = 0
400 1
        tasks_details = self._client.describe_tasks(
401
            cluster_name=self._cluster_name,
402
            task_arns=task_arns
403
        )
404 1
        for task in tasks_details[u'tasks']:
405 1
            arn = task[u'taskDefinitionArn']
406 1
            status = task[u'lastStatus']
407 1
            if arn == service.task_definition and status == u'RUNNING':
408 1
                running_count += 1
409 1
        return running_count
410
411 1
    @property
412
    def client(self):
413 1
        return self._client
414
415 1
    @property
416
    def service(self):
417 1
        return self._service
418
419 1
    @property
420
    def cluster_name(self):
421 1
        return self._cluster_name
422
423 1
    @property
424
    def service_name(self):
425 1
        return self._service_name
426
427
428 1
class DeployAction(EcsAction):
429 1
    def deploy(self, task_definition):
430 1
        self._service.set_task_definition(task_definition)
431 1
        return self.update_service(self._service)
432
433
434 1
class ScaleAction(EcsAction):
435 1
    def scale(self, desired_count):
436 1
        self._service.set_desired_count(desired_count)
437 1
        return self.update_service(self._service)
438
439
440 1
class RunAction(EcsAction):
441 1
    def __init__(self, client, cluster_name):
442 1
        super(RunAction, self).__init__(client, cluster_name, None)
443 1
        self._client = client
444 1
        self._cluster_name = cluster_name
445 1
        self.started_tasks = []
446
447 1
    def run(self, task_definition, count, started_by):
448 1
        result = self._client.run_task(
449
            cluster=self._cluster_name,
450
            task_definition=task_definition.family_revision,
451
            count=count,
452
            started_by=started_by,
453
            overrides=dict(containerOverrides=task_definition.get_overrides())
454
        )
455 1
        self.started_tasks = result['tasks']
456 1
        return True
457
458
459 1
class EcsError(Exception):
460 1
    pass
461
462
463 1
class EcsConnectionError(EcsError):
464 1
    pass
465
466
467 1
class UnknownContainerError(EcsError):
468 1
    pass
469
470
471 1
class TaskPlacementError(EcsError):
472 1
    pass
473
474
475 1
class UnknownTaskDefinitionError(EcsError):
476
    pass
477