Completed
Push — develop ( 3e76c8...b2fce6 )
by Fabian
03:37 queued 03:12
created

EcsClient.__init__()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 7
ccs 3
cts 3
cp 1
rs 9.4285
cc 1
crap 1
1 1
from datetime import datetime
2
3 1
from boto3.session import Session
4 1
from botocore.exceptions import ClientError, NoCredentialsError
5 1
from dateutil.tz.tz import tzlocal
6
7
8 1
VERSION = '1.3.1'
9
10
11 1
class EcsClient(object):
12 1
    def __init__(self, access_key_id=None, secret_access_key=None,
13
                 region=None, profile=None):
14 1
        session = Session(aws_access_key_id=access_key_id,
15
                          aws_secret_access_key=secret_access_key,
16
                          region_name=region,
17
                          profile_name=profile)
18 1
        self.boto = session.client(u'ecs')
19
20 1
    def describe_services(self, cluster_name, service_name):
21 1
        return self.boto.describe_services(
22
            cluster=cluster_name,
23
            services=[service_name]
24
        )
25
26 1
    def describe_task_definition(self, task_definition_arn):
27 1
        try:
28 1
            return self.boto.describe_task_definition(
29
                taskDefinition=task_definition_arn
30
            )
31 1
        except ClientError:
32 1
            raise UnknownTaskDefinitionError(
33
                u'Unknown task definition arn: %s' % task_definition_arn
34
            )
35
36 1
    def list_tasks(self, cluster_name, service_name):
37 1
        return self.boto.list_tasks(
38
            cluster=cluster_name,
39
            serviceName=service_name
40
        )
41
42 1
    def describe_tasks(self, cluster_name, task_arns):
43 1
        return self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns)
44
45 1
    def register_task_definition(self, family, containers, volumes, role_arn,
46
                                 additional_properties):
47 1
        return self.boto.register_task_definition(
48
            family=family,
49
            containerDefinitions=containers,
50
            volumes=volumes,
51
            taskRoleArn=role_arn,
52
            **additional_properties
53
        )
54
55 1
    def deregister_task_definition(self, task_definition_arn):
56 1
        return self.boto.deregister_task_definition(
57
            taskDefinition=task_definition_arn
58
        )
59
60 1
    def update_service(self, cluster, service, desired_count, task_definition):
61 1
        return self.boto.update_service(
62
            cluster=cluster,
63
            service=service,
64
            desiredCount=desired_count,
65
            taskDefinition=task_definition
66
        )
67
68 1
    def run_task(self, cluster, task_definition, count, started_by, overrides):
69 1
        return self.boto.run_task(
70
            cluster=cluster,
71
            taskDefinition=task_definition,
72
            count=count,
73
            startedBy=started_by,
74
            overrides=overrides
75
        )
76
77
78 1
class EcsService(dict):
79 1
    def __init__(self, cluster, service_definition=None, **kwargs):
80 1
        self._cluster = cluster
81 1
        super(EcsService, self).__init__(service_definition, **kwargs)
82
83 1
    def set_desired_count(self, desired_count):
84 1
        self[u'desiredCount'] = desired_count
85
86 1
    def set_task_definition(self, task_definition):
87 1
        self[u'taskDefinition'] = task_definition.arn
88
89 1
    @property
90
    def cluster(self):
91 1
        return self._cluster
92
93 1
    @property
94
    def name(self):
95 1
        return self.get(u'serviceName')
96
97 1
    @property
98
    def task_definition(self):
99 1
        return self.get(u'taskDefinition')
100
101 1
    @property
102
    def desired_count(self):
103 1
        return self.get(u'desiredCount')
104
105 1
    @property
106
    def deployment_created_at(self):
107 1
        for deployment in self.get(u'deployments'):
108 1
            if deployment.get(u'status') == u'PRIMARY':
109 1
                return deployment.get(u'createdAt')
110 1
        return datetime.now()
111
112 1
    @property
113
    def deployment_updated_at(self):
114 1
        for deployment in self.get(u'deployments'):
115 1
            if deployment.get(u'status') == u'PRIMARY':
116 1
                return deployment.get(u'updatedAt')
117 1
        return datetime.now()
118
119 1
    @property
120
    def errors(self):
121 1
        return self.get_warnings(
122
            since=self.deployment_updated_at
123
        )
124
125 1
    @property
126
    def older_errors(self):
127 1
        return self.get_warnings(
128
            since=self.deployment_created_at,
129
            until=self.deployment_updated_at
130
        )
131
132 1
    def get_warnings(self, since=None, until=None):
133 1
        since = since or self.deployment_created_at
134 1
        until = until or datetime.now(tz=tzlocal())
135 1
        errors = {}
136 1
        for event in self.get(u'events'):
137 1
            if u'unable' not in event[u'message']:
138
                continue
139 1
            if since < event[u'createdAt'] < until:
140 1
                errors[event[u'createdAt']] = event[u'message']
141 1
        return errors
142
143
144 1
class EcsTaskDefinition(object):
145 1
    def __init__(self, containerDefinitions, volumes, family, revision,
146
                 status, taskDefinitionArn, requiresAttributes,
147
                 taskRoleArn=None, **kwargs):
148 1
        self.containers = containerDefinitions
149 1
        self.volumes = volumes
150 1
        self.family = family
151 1
        self.revision = revision
152 1
        self.status = status
153 1
        self.arn = taskDefinitionArn
154 1
        self.requires_attributes = requiresAttributes
155 1
        self.role_arn = taskRoleArn or u''
156 1
        self.additional_properties = kwargs
157 1
        self._diff = []
158
159 1
    @property
160
    def container_names(self):
161 1
        for container in self.containers:
162 1
            yield container[u'name']
163
164 1
    @property
165
    def family_revision(self):
166 1
        return '%s:%d' % (self.family, self.revision)
167
168 1
    @property
169
    def diff(self):
170 1
        return self._diff
171
172 1
    def get_overrides(self):
173 1
        override = dict()
174 1
        overrides = []
175 1
        for diff in self.diff:
176 1
            if override.get('name') != diff.container:
177 1
                override = dict(name=diff.container)
178 1
                overrides.append(override)
179 1
            if diff.field == 'command':
180 1
                override['command'] = self.get_overrides_command(diff.value)
181 1
            elif diff.field == 'environment':
182 1
                override['environment'] = self.get_overrides_env(diff.value)
183 1
        return overrides
184
185 1
    @staticmethod
186
    def get_overrides_command(command):
187 1
        return command.split(' ')
188
189 1
    @staticmethod
190
    def get_overrides_env(env):
191 1
        return [{"name": e, "value": env[e]} for e in env]
192
193 1
    def set_images(self, tag=None, **images):
194 1
        self.validate_container_options(**images)
195 1
        for container in self.containers:
196 1
            if container[u'name'] in images:
197 1
                new_image = images[container[u'name']]
198 1
                diff = EcsTaskDefinitionDiff(
199
                    container=container[u'name'],
200
                    field=u'image',
201
                    value=new_image,
202
                    old_value=container[u'image']
203
                )
204 1
                self._diff.append(diff)
205 1
                container[u'image'] = new_image
206 1
            elif tag:
207 1
                image_definition = container[u'image'].rsplit(u':', 1)
208 1
                new_image = u'%s:%s' % (image_definition[0], tag.strip())
209 1
                diff = EcsTaskDefinitionDiff(
210
                    container=container[u'name'],
211
                    field=u'image',
212
                    value=new_image,
213
                    old_value=container[u'image']
214
                )
215 1
                self._diff.append(diff)
216 1 View Code Duplication
                container[u'image'] = new_image
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
217
218 1
    def set_commands(self, **commands):
219 1
        self.validate_container_options(**commands)
220 1
        for container in self.containers:
221 1
            if container[u'name'] in commands:
222 1
                new_command = commands[container[u'name']]
223 1
                diff = EcsTaskDefinitionDiff(
224
                    container=container[u'name'],
225
                    field=u'command',
226
                    value=new_command,
227
                    old_value=container.get(u'command')
228
                )
229 1
                self._diff.append(diff)
230 1
                container[u'command'] = [new_command]
231 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232 1
    def set_environment(self, environment_list):
233 1
        environment = {}
234
235 1
        for env in environment_list:
236 1
            environment.setdefault(env[0], {})
237 1
            environment[env[0]][env[1]] = env[2]
238
239 1
        self.validate_container_options(**environment)
240 1
        for container in self.containers:
241 1
            if container[u'name'] in environment:
242 1
                self.apply_container_environment(
243
                    container=container,
244
                    new_environment=environment[container[u'name']]
245
                )
246
247 1
    def apply_container_environment(self, container, new_environment):
248 1
        environment = container.get('environment', {})
249 1
        old_environment = {env['name']: env['value'] for env in environment}
250 1
        merged = old_environment.copy()
251 1
        merged.update(new_environment)
252
253 1
        if old_environment == merged:
254 1
            return
255
256 1
        diff = EcsTaskDefinitionDiff(
257
            container=container[u'name'],
258
            field=u'environment',
259
            value=merged,
260
            old_value=old_environment
261
        )
262 1
        self._diff.append(diff)
263
264 1
        container[u'environment'] = [
265
            {"name": e, "value": merged[e]} for e in merged
266
        ]
267
268 1
    def validate_container_options(self, **container_options):
269 1
        for container_name in container_options:
270 1
            if container_name not in self.container_names:
271 1
                raise UnknownContainerError(
272
                    u'Unknown container: %s' % container_name
273
                )
274
275 1
    def set_role_arn(self, role_arn):
276 1
        if role_arn:
277 1
            diff = EcsTaskDefinitionDiff(
278
                container=None,
279
                field=u'role_arn',
280
                value=role_arn,
281
                old_value=self.role_arn
282
            )
283 1
            self.role_arn = role_arn
284 1
            self._diff.append(diff)
285
286
287 1
class EcsTaskDefinitionDiff(object):
288 1
    def __init__(self, container, field, value, old_value):
289 1
        self.container = container
290 1
        self.field = field
291 1
        self.value = value
292 1
        self.old_value = old_value
293
294 1
    def __repr__(self):
295 1
        if self.field == u'environment':
296 1
            return '\n'.join(self._get_environment_diffs(
297
                self.container,
298
                self.value,
299
                self.old_value,
300
            ))
301 1
        elif self.container:
302 1
            return u'Changed %s of container "%s" to: "%s" (was: "%s")' % (
303
                self.field,
304
                self.container,
305
                self.value,
306
                self.old_value
307
            )
308
        else:
309 1
            return u'Changed %s to: "%s" (was: "%s")' % (
310
                self.field,
311
                self.value,
312
                self.old_value
313
            )
314
315 1
    @staticmethod
316
    def _get_environment_diffs(container, env, old_env):
317 1
        msg = u'Changed environment "%s" of container "%s" to: "%s"'
318 1
        diffs = []
319 1
        for name, value in env.items():
320 1
            old_value = old_env.get(name)
321 1
            if value != old_value or not old_value:
322 1
                message = msg % (name, container, value)
323 1
                diffs.append(message)
324 1
        return diffs
325
326
327 1
class EcsAction(object):
328 1
    def __init__(self, client, cluster_name, service_name):
329 1
        self._client = client
330 1
        self._cluster_name = cluster_name
331 1
        self._service_name = service_name
332
333 1
        try:
334 1
            if service_name:
335 1
                self._service = self.get_service()
336 1
        except IndexError:
337 1
            raise EcsConnectionError(
338
                u'An error occurred when calling the DescribeServices '
339
                u'operation: Service not found.'
340
            )
341 1
        except ClientError as e:
342 1
            raise EcsConnectionError(str(e))
343 1
        except NoCredentialsError:
344 1
            raise EcsConnectionError(
345
                u'Unable to locate credentials. Configure credentials '
346
                u'by running "aws configure".'
347
            )
348
349 1
    def get_service(self):
350 1
        services_definition = self._client.describe_services(
351
            cluster_name=self._cluster_name,
352
            service_name=self._service_name
353
        )
354 1
        return EcsService(
355
            cluster=self._cluster_name,
356
            service_definition=services_definition[u'services'][0]
357
        )
358
359 1
    def get_current_task_definition(self, service):
360 1
        return self.get_task_definition(service.task_definition)
361
362 1
    def get_task_definition(self, task_definition):
363 1
        task_definition_payload = self._client.describe_task_definition(
364
            task_definition_arn=task_definition
365
        )
366
367 1
        task_definition = EcsTaskDefinition(
368
            **task_definition_payload[u'taskDefinition']
369
        )
370 1
        return task_definition
371
372 1
    def update_task_definition(self, task_definition):
373 1
        response = self._client.register_task_definition(
374
            family=task_definition.family,
375
            containers=task_definition.containers,
376
            volumes=task_definition.volumes,
377
            role_arn=task_definition.role_arn,
378
            additional_properties=task_definition.additional_properties
379
        )
380 1
        new_task_definition = EcsTaskDefinition(**response[u'taskDefinition'])
381 1
        return new_task_definition
382
383 1
    def deregister_task_definition(self, task_definition):
384 1
        self._client.deregister_task_definition(task_definition.arn)
385
386 1
    def update_service(self, service):
387 1
        response = self._client.update_service(
388
            cluster=service.cluster,
389
            service=service.name,
390
            desired_count=service.desired_count,
391
            task_definition=service.task_definition
392
        )
393 1
        return EcsService(self._cluster_name, response[u'service'])
394
395 1
    def is_deployed(self, service):
396 1
        if len(service[u'deployments']) != 1:
397 1
            return False
398 1
        running_tasks = self._client.list_tasks(
399
            cluster_name=service.cluster,
400
            service_name=service.name
401
        )
402 1
        if not running_tasks[u'taskArns']:
403 1
            return service.desired_count == 0
404 1
        running_count = self.get_running_tasks_count(
405
            service=service,
406
            task_arns=running_tasks[u'taskArns']
407
        )
408 1
        return service.desired_count == running_count
409
410 1
    def get_running_tasks_count(self, service, task_arns):
411 1
        running_count = 0
412 1
        tasks_details = self._client.describe_tasks(
413
            cluster_name=self._cluster_name,
414
            task_arns=task_arns
415
        )
416 1
        for task in tasks_details[u'tasks']:
417 1
            arn = task[u'taskDefinitionArn']
418 1
            status = task[u'lastStatus']
419 1
            if arn == service.task_definition and status == u'RUNNING':
420 1
                running_count += 1
421 1
        return running_count
422
423 1
    @property
424
    def client(self):
425 1
        return self._client
426
427 1
    @property
428
    def service(self):
429 1
        return self._service
430
431 1
    @property
432
    def cluster_name(self):
433 1
        return self._cluster_name
434
435 1
    @property
436
    def service_name(self):
437 1
        return self._service_name
438
439
440 1
class DeployAction(EcsAction):
441 1
    def deploy(self, task_definition):
442 1
        try:
443 1
            self._service.set_task_definition(task_definition)
444 1
            return self.update_service(self._service)
445 1
        except ClientError as e:
446 1
            raise EcsError(str(e))
447
448
449 1
class ScaleAction(EcsAction):
450 1
    def scale(self, desired_count):
451 1
        try:
452 1
            self._service.set_desired_count(desired_count)
453 1
            return self.update_service(self._service)
454 1
        except ClientError as e:
455 1
            raise EcsError(str(e))
456
457
458 1
class RunAction(EcsAction):
459 1
    def __init__(self, client, cluster_name):
460 1
        super(RunAction, self).__init__(client, cluster_name, None)
461 1
        self._client = client
462 1
        self._cluster_name = cluster_name
463 1
        self.started_tasks = []
464
465 1
    def run(self, task_definition, count, started_by):
466 1
        try:
467 1
            result = self._client.run_task(
468
                cluster=self._cluster_name,
469
                task_definition=task_definition.family_revision,
470
                count=count,
471
                started_by=started_by,
472
                overrides=dict(containerOverrides=task_definition.get_overrides())
473
            )
474 1
            self.started_tasks = result['tasks']
475 1
            return True
476 1
        except ClientError as e:
477 1
            raise EcsError(str(e))
478
479
480 1
class EcsError(Exception):
481 1
    pass
482
483
484 1
class EcsConnectionError(EcsError):
485 1
    pass
486
487
488 1
class UnknownContainerError(EcsError):
489 1
    pass
490
491
492 1
class TaskPlacementError(EcsError):
493 1
    pass
494
495
496 1
class UnknownTaskDefinitionError(EcsError):
497
    pass
498